diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index e5fa0a03d621..dbcd271cccfb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -102,6 +102,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor private final Closer closer = Closer.create(); private Cursor cursor; + private Closeable cursorCloser; private Segment segment; private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE); private FrameWriter frameWriter; @@ -156,6 +157,7 @@ public ReturnOrAwait runIncrementally(final IntSet readableInputs) throw @Override public void cleanup() throws IOException { + closer.register(cursorCloser); closer.register(frameWriter); closer.register(super::cleanup); closer.close(); @@ -221,7 +223,7 @@ protected ReturnOrAwait runWithDataServerQuery(final DataSer cursorYielder.close(); return ReturnOrAwait.returnObject(handedOffSegments); } else { - final long rowsFlushed = setNextCursor(cursorYielder.get(), null); + final long rowsFlushed = setNextCursor(cursorYielder.get(), null, null); closer.register(cursorYielder); if (rowsFlushed > 0) { return ReturnOrAwait.runAgain(); @@ -256,16 +258,16 @@ protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment ); } - final CursorHolder cursorHolder = closer.register( - cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)) - ); - final Cursor nextCursor = cursorHolder.asCursor(); + final CursorHolder nextCursorHolder = + cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)); + final Cursor nextCursor = nextCursorHolder.asCursor(); if (nextCursor == null) { // No cursors! + nextCursorHolder.close(); return ReturnOrAwait.returnObject(Unit.instance()); } else { - final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment()); + final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, segmentHolder.get().getSegment()); assert rowsFlushed == 0; // There's only ever one cursor when running with a segment } } @@ -302,16 +304,16 @@ protected ReturnOrAwait runWithInputChannel( ); } - final CursorHolder cursorHolder = closer.register( - cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)) - ); - final Cursor nextCursor = cursorHolder.asCursor(); + final CursorHolder nextCursorHolder = + cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)); + final Cursor nextCursor = nextCursorHolder.asCursor(); if (nextCursor == null) { // no cursor + nextCursorHolder.close(); return ReturnOrAwait.returnObject(Unit.instance()); } - final long rowsFlushed = setNextCursor(nextCursor, frameSegment); + final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, frameSegment); if (rowsFlushed > 0) { return ReturnOrAwait.runAgain(); @@ -415,10 +417,20 @@ private long flushFrameWriter() throws IOException } } - private long setNextCursor(final Cursor cursor, final Segment segment) throws IOException + private long setNextCursor( + final Cursor cursor, + @Nullable final Closeable cursorCloser, + final Segment segment + ) throws IOException { final long rowsFlushed = flushFrameWriter(); + if (this.cursorCloser != null) { + // Close here, don't add to the processor-level Closer, to avoid leaking CursorHolders. We may generate many + // CursorHolders per instance of this processor, and we need to close them as we go, not all at the end. + this.cursorCloser.close(); + } this.cursor = cursor; + this.cursorCloser = cursorCloser; this.segment = segment; this.cursorOffset.reset(); return rowsFlushed;