From 6c327755c5da8f1a308ffc3cbd3453cf63484a65 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 26 Jul 2024 12:39:27 -0700 Subject: [PATCH] split out CursorMakerFactory interface, adjustments --- ...bjectVectorColumnProcessorFactoryTest.java | 8 +- .../SegmentGeneratorFrameProcessor.java | 3 + .../results/ExportResultsFrameProcessor.java | 4 + .../hadoop/DatasourceRecordReader.java | 180 +++++++++--------- .../indexing/input/DruidSegmentReader.java | 4 + .../apache/druid/frame/field/RowReader.java | 4 +- .../apache/druid/frame/read/FrameReader.java | 12 +- .../druid/frame/segment/FrameCursor.java | 5 +- .../frame/segment/FrameStorageAdapter.java | 6 +- ...tory.java => FrameCursorMakerFactory.java} | 10 +- .../segment/columnar/FrameQueryableIndex.java | 2 +- ...tory.java => FrameCursorMakerFactory.java} | 10 +- .../LazilyDecoratedRowsAndColumns.java | 4 + .../StorageAdapterRowsAndColumns.java | 4 + .../druid/query/scan/ScanQueryEngine.java | 3 + .../TimeBoundaryQueryRunnerFactory.java | 2 +- .../apache/druid/segment/CursorFactory.java | 3 +- .../druid/segment/CursorMakerFactory.java | 25 +++ .../druid/segment/UnnestStorageAdapter.java | 5 +- .../join/HashJoinSegmentStorageAdapter.java | 4 +- .../table/BroadcastSegmentIndexedTable.java | 5 + .../druid/query/scan/ScanQueryTest.java | 4 +- .../org/apache/druid/cli/DumpSegment.java | 3 + 23 files changed, 183 insertions(+), 127 deletions(-) rename processing/src/main/java/org/apache/druid/frame/segment/columnar/{FrameCursorFactory.java => FrameCursorMakerFactory.java} (95%) rename processing/src/main/java/org/apache/druid/frame/segment/row/{FrameCursorFactory.java => FrameCursorMakerFactory.java} (91%) create mode 100644 processing/src/main/java/org/apache/druid/segment/CursorMakerFactory.java diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java index 7c7a7466c9fb..68b2b82e8c6d 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java @@ -57,7 +57,9 @@ public void setUp() @Test public void testRead() { - try (final CursorMaker maker = makeCursor(); final VectorCursor cursor = maker.makeVectorCursor()) { + try (final CursorMaker maker = makeCursor(); + final VectorCursor cursor = maker.makeVectorCursor() + ) { final Supplier qualitySupplier = ColumnProcessors.makeVectorProcessor( "quality", ToObjectVectorColumnProcessorFactory.INSTANCE, @@ -184,7 +186,9 @@ private CursorMaker makeCursor() private List readColumn(final String column, final int limit) { - try (final CursorMaker maker = makeCursor(); final VectorCursor cursor = maker.makeVectorCursor()) { + try (final CursorMaker maker = makeCursor(); + final VectorCursor cursor = maker.makeVectorCursor() + ) { final Supplier supplier = ColumnProcessors.makeVectorProcessor( column, ToObjectVectorColumnProcessorFactory.INSTANCE, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java index 911d13579583..e865137d7263 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java @@ -184,6 +184,9 @@ private void addFrame(final Frame frame) final StorageAdapter adapter = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY); try (final CursorMaker maker = adapter.asCursorMaker(CursorBuildSpec.FULL_SCAN)) { final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + return; + } final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java index f54f8336f97d..52fe8a9d6c3c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java @@ -154,6 +154,10 @@ private void exportFrame(final Frame frame) final StorageAdapter adapter = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY); try (final CursorMaker maker = adapter.asCursorMaker(CursorBuildSpec.FULL_SCAN)) { final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + exportWriter.writeRowEnd(); + return; + } final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); //noinspection rawtypes diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java index 40a67f1236e0..5ddbef8842bf 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java @@ -44,10 +44,11 @@ import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.CursorBuildSpec; +import org.apache.druid.segment.CursorMaker; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; -import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.filter.Filters; @@ -214,108 +215,99 @@ public SegmentReader( @Override public Sequence apply(WindowedStorageAdapter adapter) { - return Sequences.concat( - Sequences.map( - adapter.getAdapter().makeCursors( - Filters.toFilter(dimFilter), - adapter.getInterval(), - VirtualColumns.EMPTY, - Granularities.ALL, - false, - null - ), new Function>() + final CursorBuildSpec buildSpec = CursorBuildSpec.builder() + .setFilter(Filters.toFilter(dimFilter)) + .setInterval(adapter.getInterval()) + .setGranularity(Granularities.ALL) + .build(); + final CursorMaker maker = adapter.getAdapter().asCursorMaker(buildSpec); + final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + return Sequences.empty(); + } + final BaseLongColumnValueSelector timestampColumnSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); + + final Map dimSelectors = new HashMap<>(); + for (String dim : dims) { + final DimensionSelector dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); + } + } + + final Map metSelectors = new HashMap<>(); + for (String metric : metrics) { + final BaseObjectColumnValueSelector metricSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(metric); + metSelectors.put(metric, metricSelector); + } + + return Sequences.simple( + new Iterable() + { + @Override + public Iterator iterator() + { + return new Iterator() { - @Nullable @Override - public Sequence apply(final Cursor cursor) + public boolean hasNext() { - final BaseLongColumnValueSelector timestampColumnSelector = - cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); - - final Map dimSelectors = new HashMap<>(); - for (String dim : dims) { - final DimensionSelector dimSelector = cursor - .getColumnSelectorFactory() - .makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); - // dimSelector is null if the dimension is not present - if (dimSelector != null) { - dimSelectors.put(dim, dimSelector); + return !cursor.isDone(); + } + + @Override + public InputRow next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.getLong(); + theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp)); + + for (Map.Entry dimSelector : + dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + int valsSize = vals.size(); + if (valsSize == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else if (valsSize > 1) { + List dimVals = new ArrayList<>(valsSize); + for (int i = 0; i < valsSize; ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); } } - final Map metSelectors = new HashMap<>(); - for (String metric : metrics) { - final BaseObjectColumnValueSelector metricSelector = - cursor.getColumnSelectorFactory().makeColumnValueSelector(metric); - metSelectors.put(metric, metricSelector); + for (Map.Entry metSelector : + metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final BaseObjectColumnValueSelector selector = metSelector.getValue(); + Object value = selector.getObject(); + if (value != null) { + theEvent.put(metric, value); + } } + cursor.advance(); + return new MapBasedInputRow(timestamp, dims, theEvent); + } - return Sequences.simple( - new Iterable() - { - @Override - public Iterator iterator() - { - return new Iterator() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); - } - - @Override - public InputRow next() - { - final Map theEvent = Maps.newLinkedHashMap(); - final long timestamp = timestampColumnSelector.getLong(); - theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp)); - - for (Map.Entry dimSelector : - dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - final IndexedInts vals = selector.getRow(); - - int valsSize = vals.size(); - if (valsSize == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else if (valsSize > 1) { - List dimVals = new ArrayList<>(valsSize); - for (int i = 0; i < valsSize; ++i) { - dimVals.add(selector.lookupName(vals.get(i))); - } - theEvent.put(dim, dimVals); - } - } - - for (Map.Entry metSelector : - metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final BaseObjectColumnValueSelector selector = metSelector.getValue(); - Object value = selector.getObject(); - if (value != null) { - theEvent.put(metric, value); - } - } - cursor.advance(); - return new MapBasedInputRow(timestamp, dims, theEvent); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Remove Not Supported"); - } - }; - } - } - ); + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); } - } - ) - ); + }; + } + } + ).withBaggage(maker); } } ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index 7dcaf9a8b502..f04d2eb8f7d4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -132,6 +133,9 @@ protected CloseableIterator> intermediateRowIterator() throw final CursorMaker maker = storageAdapter.getAdapter().asCursorMaker(cursorBuildSpec); final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + return CloseableIterators.wrap(Collections.emptyIterator(), maker); + } // Retain order of columns from the original segments. Useful for preserving dimension order if we're in // schemaless mode. diff --git a/processing/src/main/java/org/apache/druid/frame/field/RowReader.java b/processing/src/main/java/org/apache/druid/frame/field/RowReader.java index c79c9b25d677..f1563d044bfe 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/RowReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/RowReader.java @@ -57,7 +57,7 @@ public int fieldCount() /** * Read a particular field value as an object. * - * For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorFactory} + * For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorMakerFactory} * for reading many rows out of a frame. */ public Object readField(final Memory memory, final long rowPosition, final long rowLength, final int fieldNumber) @@ -77,7 +77,7 @@ public Object readField(final Memory memory, final long rowPosition, final long /** * Read an entire row as a list of objects. * - * For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorFactory} + * For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorMakerFactory} * for reading many rows out of a frame. */ public List readRow(final Memory memory, final long rowPosition, final long rowLength) diff --git a/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java b/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java index 8ddf99325d39..46855570077a 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java @@ -28,11 +28,11 @@ import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.frame.read.columnar.FrameColumnReader; import org.apache.druid.frame.read.columnar.FrameColumnReaders; -import org.apache.druid.frame.segment.row.FrameCursorFactory; +import org.apache.druid.frame.segment.row.FrameCursorMakerFactory; import org.apache.druid.frame.write.FrameWriterUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.CursorMakerFactory; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -136,15 +136,15 @@ public ColumnCapabilities columnCapabilities(final Frame frame, final String col } /** - * Create a {@link CursorFactory} for the given frame. + * Create a {@link CursorMakerFactory} for the given frame. */ - public CursorFactory makeCursorFactory(final Frame frame) + public CursorMakerFactory makeCursorMakerFactory(final Frame frame) { switch (frame.type()) { case COLUMNAR: - return new org.apache.druid.frame.segment.columnar.FrameCursorFactory(frame, signature, columnReaders); + return new org.apache.druid.frame.segment.columnar.FrameCursorMakerFactory(frame, signature, columnReaders); case ROW_BASED: - return new FrameCursorFactory(frame, this, fieldReaders); + return new FrameCursorMakerFactory(frame, this, fieldReaders); default: throw new ISE("Unrecognized frame type [%s]", frame.type()); } diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursor.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursor.java index 35672c0338d0..63f87f7efad7 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursor.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursor.java @@ -19,14 +19,15 @@ package org.apache.druid.frame.segment; +import org.apache.druid.frame.segment.columnar.FrameCursorMakerFactory; import org.apache.druid.query.BaseQuery; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.SimpleSettableOffset; /** - * An implementation of {@link Cursor} used by {@link org.apache.druid.frame.segment.row.FrameCursorFactory} - * and {@link org.apache.druid.frame.segment.columnar.FrameCursorFactory}. + * An implementation of {@link Cursor} used by {@link org.apache.druid.frame.segment.row.FrameCursorMakerFactory} + * and {@link FrameCursorMakerFactory}. * * Adds the methods {@link #getCurrentRow()} and {@link #setCurrentRow(int)} so the cursor can be moved to * particular rows. diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameStorageAdapter.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameStorageAdapter.java index b82529de506b..fcc8cfdaa4f4 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameStorageAdapter.java @@ -22,8 +22,8 @@ import org.apache.druid.frame.Frame; import org.apache.druid.frame.read.FrameReader; import org.apache.druid.segment.CursorBuildSpec; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorMaker; +import org.apache.druid.segment.CursorMakerFactory; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.StorageAdapter; @@ -47,14 +47,14 @@ public class FrameStorageAdapter implements StorageAdapter private final Frame frame; private final FrameReader frameReader; private final Interval interval; - private final CursorFactory cursorFactory; + private final CursorMakerFactory cursorFactory; public FrameStorageAdapter(Frame frame, FrameReader frameReader, Interval interval) { this.frame = frame; this.frameReader = frameReader; this.interval = interval; - this.cursorFactory = frameReader.makeCursorFactory(frame); + this.cursorFactory = frameReader.makeCursorMakerFactory(frame); } @Override diff --git a/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameCursorFactory.java b/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameCursorMakerFactory.java similarity index 95% rename from processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameCursorFactory.java rename to processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameCursorMakerFactory.java index e6aaf9d6fe6c..81945b795be9 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameCursorMakerFactory.java @@ -34,8 +34,8 @@ import org.apache.druid.segment.ColumnCache; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorMaker; +import org.apache.druid.segment.CursorMakerFactory; import org.apache.druid.segment.QueryableIndexColumnSelectorFactory; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.SimpleDescendingOffset; @@ -55,19 +55,19 @@ import java.util.List; /** - * A {@link CursorFactory} implementation based on a single columnar {@link Frame}. + * A {@link CursorMakerFactory} implementation based on a single columnar {@link Frame}. * * This class is only used for columnar frames. It is not used for row-based frames. * - * @see org.apache.druid.frame.segment.row.FrameCursorFactory the row-based version + * @see org.apache.druid.frame.segment.row.FrameCursorMakerFactory the row-based version */ -public class FrameCursorFactory implements CursorFactory +public class FrameCursorMakerFactory implements CursorMakerFactory { private final Frame frame; private final RowSignature signature; private final List columnReaders; - public FrameCursorFactory( + public FrameCursorMakerFactory( final Frame frame, final RowSignature signature, final List columnReaders diff --git a/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameQueryableIndex.java b/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameQueryableIndex.java index f42a33ce6f78..94e4d637ddf5 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameQueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/columnar/FrameQueryableIndex.java @@ -42,7 +42,7 @@ * A {@link QueryableIndex} implementation based on a single columnar {@link Frame}. There is no internal caching * of columns here, so callers should generally wrap this in a {@link org.apache.druid.segment.ColumnCache}. * - * This class exists so {@link FrameCursorFactory} can reuse code meant for regular segment-backed + * This class exists so {@link FrameCursorMakerFactory} can reuse code meant for regular segment-backed * {@link QueryableIndex}. Some methods are implemented by throwing {@link UnsupportedOperationException}, wherever * it is not expected that those methods are actually going to be needed. */ diff --git a/processing/src/main/java/org/apache/druid/frame/segment/row/FrameCursorFactory.java b/processing/src/main/java/org/apache/druid/frame/segment/row/FrameCursorMakerFactory.java similarity index 91% rename from processing/src/main/java/org/apache/druid/frame/segment/row/FrameCursorFactory.java rename to processing/src/main/java/org/apache/druid/frame/segment/row/FrameCursorMakerFactory.java index a2005d91405a..56e84387b35c 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/row/FrameCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/row/FrameCursorMakerFactory.java @@ -31,8 +31,8 @@ import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.CursorBuildSpec; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorMaker; +import org.apache.druid.segment.CursorMakerFactory; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.SimpleDescendingOffset; import org.apache.druid.segment.SimpleSettableOffset; @@ -40,19 +40,19 @@ import java.util.List; /** - * A {@link CursorFactory} implementation based on a single row-based {@link Frame}. + * A {@link CursorMakerFactory} implementation based on a single row-based {@link Frame}. * * This class is only used for row-based frames. * - * @see org.apache.druid.frame.segment.columnar.FrameCursorFactory the columnar version + * @see org.apache.druid.frame.segment.columnar.FrameCursorMakerFactory the columnar version */ -public class FrameCursorFactory implements CursorFactory +public class FrameCursorMakerFactory implements CursorMakerFactory { private final Frame frame; private final FrameReader frameReader; private final List fieldReaders; - public FrameCursorFactory( + public FrameCursorMakerFactory( final Frame frame, final FrameReader frameReader, final List fieldReaders diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index a7dbd192b504..19a093bdf98d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -231,6 +231,10 @@ private Pair materializeStorageAdapter(StorageAdapter as) try (final CursorMaker maker = as.asCursorMaker(builder.build())) { final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + return null; + } + final AtomicReference siggy = new AtomicReference<>(null); long remainingRowsToSkip = limit.getOffset(); diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java index 29bdad4aafc9..17aa2443544c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java @@ -98,6 +98,10 @@ private static RowsAndColumns materialize(StorageAdapter as) try (final CursorMaker maker = as.asCursorMaker(CursorBuildSpec.FULL_SCAN)) { final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + return new EmptyRowsAndColumns(); + } + final RowSignature rowSignature = as.getRowSignature(); final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index 094c38c8a97e..4dc5841c78a8 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -122,6 +122,9 @@ public Sequence process( public Iterator make() { final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + return Collections.emptyIterator(); + } final List columnSelectors = new ArrayList<>(allColumns.size()); final RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 96da640110a5..4d561a1c712f 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -123,7 +123,7 @@ private DateTime getTimeBoundary(StorageAdapter adapter, TimeBoundaryQuery legac if (cursor == null) { return null; } - final Result result = skipToFirstMatching.apply(maker.makeCursor()); + final Result result = skipToFirstMatching.apply(cursor); return result == null ? null : result.getValue(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java index 8ebd2fd444df..68363e740fad 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java @@ -36,8 +36,9 @@ * * @see StorageAdapter */ -public interface CursorFactory +public interface CursorFactory extends CursorMakerFactory { + @Override default CursorMaker asCursorMaker(CursorBuildSpec spec) { return new CursorMaker() diff --git a/processing/src/main/java/org/apache/druid/segment/CursorMakerFactory.java b/processing/src/main/java/org/apache/druid/segment/CursorMakerFactory.java new file mode 100644 index 000000000000..31f6be54f4c7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/CursorMakerFactory.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +public interface CursorMakerFactory +{ + CursorMaker asCursorMaker(CursorBuildSpec spec); +} diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java index eba801e224e2..4f8c9ac2ebfb 100644 --- a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java @@ -55,7 +55,6 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; -import java.util.Objects; import java.util.Set; /** @@ -114,7 +113,9 @@ public Cursor makeCursor() { final CursorMaker maker = closer.register(baseAdapter.asCursorMaker(unnestBuildSpec)); final Cursor cursor = maker.makeCursor(); - Objects.requireNonNull(cursor); + if (cursor == null) { + return null; + } final ColumnCapabilities capabilities = unnestColumn.capabilities( cursor.getColumnSelectorFactory(), unnestColumn.getOutputName() diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java index 4319fdc52b57..9f87694c0be1 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java @@ -291,8 +291,10 @@ public Cursor makeCursor() final Cursor baseCursor = joinablesCloser.register(baseAdapter.asCursorMaker(cursorBuildSpecBuilder.build())) .makeCursor(); + if (baseCursor == null) { + return null; + } - assert baseCursor != null; Cursor retVal = baseCursor; for (JoinableClause clause : clauses) { diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java index edc9bb056158..d170b0578544 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java @@ -50,6 +50,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; @@ -122,6 +123,10 @@ public BroadcastSegmentIndexedTable( .build(); try (final CursorMaker maker = adapter.asCursorMaker(buildSpec)) { final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + this.keyColumnsIndexes = Collections.emptyList(); + return; + } int rowNumber = 0; ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java index ce4a417e2bf3..694e29b4416e 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java @@ -30,13 +30,13 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.CursorBuildSpec; -import org.apache.druid.segment.VirtualColumns; -import org.apache.druid.query.Query; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java index 3ad58603ea37..116ce0169fb7 100644 --- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java +++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java @@ -311,6 +311,9 @@ public static void runDump( try (final CursorMaker maker = adapter.asCursorMaker(buildSpec)) { final Cursor cursor = maker.makeCursor(); + if (cursor == null) { + return; + } withOutputStream( new Function()