From ab10354ddc5b235a74dd1b1336f0f09c27ca3f6f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 31 Jul 2024 15:56:55 -0700 Subject: [PATCH] fixup --- .../query/FrameBasedInlineDataSource.java | 17 ++++++++++------- .../join/HashJoinSegmentStorageAdapter.java | 4 +--- .../join/table/FrameBasedIndexedTable.java | 8 ++++---- .../druid/frame/testutil/FrameTestUtil.java | 18 +++++++++--------- 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java index afdf67fdac35..6c073c36234a 100644 --- a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java @@ -26,11 +26,13 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; +import org.apache.druid.segment.CursorMaker; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.column.RowSignature; @@ -80,6 +82,7 @@ public RowSignature getRowSignature() public Sequence getRowsAsSequence() { + final Closer closer = Closer.create(); final Sequence cursorSequence = Sequences.simple(frames) .map( @@ -87,14 +90,14 @@ public Sequence getRowsAsSequence() Frame frame = frameSignaturePair.getFrame(); RowSignature frameSignature = frameSignaturePair.getRowSignature(); FrameReader frameReader = FrameReader.create(frameSignature); - // currently FrameStorageAdapter cursor maker doesn't have any resources which need closed, but - // if this changes we need to register this cursor maker with a closer as the baggage of the - // sequence - return new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY).asCursorMaker( - CursorBuildSpec.FULL_SCAN - ).makeCursor(); + final CursorMaker maker = closer.register( + new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY).asCursorMaker( + CursorBuildSpec.FULL_SCAN + ) + ); + return maker.makeCursor(); } - ); + ).withBaggage(closer); return cursorSequence.flatMap( (cursor) -> { diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java index 9f87694c0be1..143831a232db 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java @@ -229,9 +229,7 @@ public CursorMaker asCursorMaker(CursorBuildSpec spec) if (clauses.isEmpty()) { - // HashJoinEngine isn't vectorized yet. - // However, we can still vectorize if there are no clauses, since that means all we need to do is apply - // a base filter. That's easy enough! + // if there are no clauses, we can just use the base cursor directly if we apply the combined filter final CursorBuildSpec newSpec = cursorBuildSpecBuilder.setFilter(combinedFilter) .setVirtualColumns(spec.getVirtualColumns()) .build(); diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java index 1824f4a810d5..e47d786629de 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.FrameSignaturePair; @@ -113,6 +114,7 @@ public FrameBasedIndexedTable( indexBuilders.add(m); } + final Closer closer = Closer.create(); final Sequence cursors = Sequences.simple( frameBasedInlineDataSource .getFrames() @@ -122,12 +124,10 @@ public FrameBasedIndexedTable( RowSignature rowSignature = frameSignaturePair.getRowSignature(); FrameStorageAdapter frameStorageAdapter = new FrameStorageAdapter(frame, FrameReader.create(rowSignature), Intervals.ETERNITY); - // currently FrameStorageAdapter cursor maker doesn't have any resources which need closed, but if this - // changes we need to register this cursor maker with a closer as the baggage of the sequence - return frameStorageAdapter.asCursorMaker(CursorBuildSpec.FULL_SCAN).makeCursor(); + return closer.register(frameStorageAdapter.asCursorMaker(CursorBuildSpec.FULL_SCAN)).makeCursor(); }) .collect(Collectors.toList()) - ); + ).withBaggage(closer); final Sequence sequence = Sequences.map( cursors, diff --git a/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java b/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java index c16197b0e0f3..188186bcbc83 100644 --- a/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java +++ b/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java @@ -232,15 +232,15 @@ public static Sequence> readRowsFromFrameChannel( { return new FrameChannelSequence(channel) .flatMap( - frame -> - readRowsFromCursor( - // if FrameStorageAdapter.asCursorMaker ever needs closing.. this needs to change to add to a - // closer that is tied to baggage of this sequence... - new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY) - .asCursorMaker(CursorBuildSpec.FULL_SCAN) - .makeCursor(), - frameReader.signature() - ) + frame -> { + final CursorMaker maker = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY).asCursorMaker( + CursorBuildSpec.FULL_SCAN + ); + return readRowsFromCursor( + maker.makeCursor(), + frameReader.signature() + ).withBaggage(maker); + } ); }