From 73a644258d69c05baac127c17de90b23e0f79f5e Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sun, 15 Sep 2024 16:45:51 -0700 Subject: [PATCH] abstract `IncrementalIndex` cursor stuff to prepare for using different "views" of the data based on the cursor build spec (#17064) * abstract `IncrementalIndex` cursor stuff to prepare to allow for possibility of using different "views" of the data based on the cursor build spec changes: * introduce `IncrementalIndexRowSelector` interface to capture how `IncrementalIndexCursor` and `IncrementalIndexColumnSelectorFactory` read data * `IncrementalIndex` implements `IncrementalIndexRowSelector` * move `FactsHolder` interface to separate file * other minor refactorings --- .../segment/incremental/FactsHolder.java | 82 ++++ .../segment/incremental/IncrementalIndex.java | 453 +++++++++--------- ...IncrementalIndexColumnSelectorFactory.java | 22 +- .../IncrementalIndexCursorFactory.java | 6 +- .../IncrementalIndexCursorHolder.java | 68 ++- .../incremental/IncrementalIndexRow.java | 2 + .../IncrementalIndexRowSelector.java | 104 ++++ .../incremental/OnheapIncrementalIndex.java | 86 ++-- .../test/TestFrameProcessorUtils.java | 17 +- .../segment/AutoTypeColumnIndexerTest.java | 44 +- .../NestedDataColumnIndexerV4Test.java | 21 +- .../IncrementalIndexIngestionTest.java | 32 +- .../IncrementalIndexMultiValueSpecTest.java | 17 +- .../virtual/ExpressionSelectorsTest.java | 15 +- 14 files changed, 572 insertions(+), 397 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java create mode 100644 processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java b/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java new file mode 100644 index 000000000000..f7eede101509 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import java.util.Comparator; +import java.util.Iterator; + +/** + * {@link IncrementalIndexRow} storage interface, a mutable data structure for building up a set or rows to eventually + * persist into an immutable segment + * + * @see IncrementalIndex for the data processor which constructs {@link IncrementalIndexRow} to store here + */ +public interface FactsHolder +{ + /** + * @return the previous rowIndex associated with the specified key, or + * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key. + */ + int getPriorIndex(IncrementalIndexRow key); + + /** + * Get minimum {@link IncrementalIndexRow#getTimestamp()} present in the facts holder + */ + long getMinTimeMillis(); + + /** + * Get maximum {@link IncrementalIndexRow#getTimestamp()} present in the facts holder + */ + long getMaxTimeMillis(); + + /** + * Get all {@link IncrementalIndex}, depending on the implementation, these rows may or may not be ordered in the same + * order they will be persisted in. Use {@link #persistIterable()} if this is required. + */ + Iterator iterator(boolean descending); + + /** + * Get all {@link IncrementalIndexRow} with {@link IncrementalIndexRow#getTimestamp()} between the start and end + * timestamps specified + */ + Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd); + + /** + * Get all row {@link IncrementalIndexRow} 'keys', which is distinct groups if this is an aggregating facts holder or + * just every row present if not + */ + Iterable keySet(); + + /** + * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator } + */ + Iterable persistIterable(); + + /** + * @return the previous rowIndex associated with the specified key, or + * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key. + */ + int putIfAbsent(IncrementalIndexRow key, int rowIndex); + + /** + * Clear all rows present in the facts holder + */ + void clear(); +} diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index fc2a02c47b7b..8adc47f65336 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -105,7 +105,7 @@ * {@link IncrementalIndexCursorFactory} are thread-safe, and may be called concurrently with each other, and with * the "add" methods. This concurrency model supports real-time queries of the data in the index. */ -public abstract class IncrementalIndex implements Iterable, Closeable, ColumnInspector +public abstract class IncrementalIndex implements IncrementalIndexRowSelector, ColumnInspector, Iterable, Closeable { /** * Column selector used at ingestion time for inputs to aggregators. @@ -255,8 +255,9 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final boolean useSchemaDiscovery; - private final InputRowHolder inputRowHolder = new InputRowHolder(); + protected final InputRowHolder inputRowHolder = new InputRowHolder(); + @Nullable private volatile DateTime maxIngestedEventTime; /** @@ -366,8 +367,6 @@ protected IncrementalIndex( ); } - public abstract FactsHolder getFacts(); - public abstract boolean canAppendRow(); public abstract String getOutOfRowsReason(); @@ -384,100 +383,11 @@ protected abstract AddToFactsResult addToFacts( boolean skipMaxRowsInMemoryCheck ) throws IndexSizeExceededException; - public abstract int getLastRowIndex(); - - protected abstract float getMetricFloatValue(int rowOffset, int aggOffset); - - protected abstract long getMetricLongValue(int rowOffset, int aggOffset); - - protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset); - - protected abstract double getMetricDoubleValue(int rowOffset, int aggOffset); - - protected abstract boolean isNull(int rowOffset, int aggOffset); - - static class IncrementalIndexRowResult - { - private final IncrementalIndexRow incrementalIndexRow; - private final List parseExceptionMessages; - - IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List parseExceptionMessages) - { - this.incrementalIndexRow = incrementalIndexRow; - this.parseExceptionMessages = parseExceptionMessages; - } - - IncrementalIndexRow getIncrementalIndexRow() - { - return incrementalIndexRow; - } - - List getParseExceptionMessages() - { - return parseExceptionMessages; - } - } - - static class AddToFactsResult - { - private final int rowCount; - private final long bytesInMemory; - private final List parseExceptionMessages; - - public AddToFactsResult( - int rowCount, - long bytesInMemory, - List parseExceptionMessages - ) - { - this.rowCount = rowCount; - this.bytesInMemory = bytesInMemory; - this.parseExceptionMessages = parseExceptionMessages; - } - - int getRowCount() - { - return rowCount; - } - - public long getBytesInMemory() - { - return bytesInMemory; - } - - public List getParseExceptionMessages() - { - return parseExceptionMessages; - } - } - - public static class InputRowHolder - { - @Nullable - private InputRow row; - private long rowId = -1; - - public void set(final InputRow row) - { - this.row = row; - this.rowId++; - } - public void unset() - { - this.row = null; - } - - public InputRow getRow() - { - return Preconditions.checkNotNull(row, "row"); - } - - public long getRowId() - { - return rowId; - } - } + public abstract Iterable iterableWithPostAggregations( + @Nullable List postAggs, + boolean descending + ); public boolean isRollup() { @@ -746,23 +656,6 @@ public static ParseException getCombinedParseException( ); } - private static String getSimplifiedEventStringFromRow(InputRow inputRow) - { - if (inputRow instanceof MapBasedInputRow) { - return ((MapBasedInputRow) inputRow).getEvent().toString(); - } - - if (inputRow instanceof ListBasedInputRow) { - return ((ListBasedInputRow) inputRow).asMap().toString(); - } - - if (inputRow instanceof TransformedInputRow) { - InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow(); - return getSimplifiedEventStringFromRow(innerRow); - } - - return inputRow.toString(); - } private synchronized void updateMaxIngestedTime(DateTime eventTime) { @@ -771,6 +664,7 @@ private synchronized void updateMaxIngestedTime(DateTime eventTime) } } + @Override public boolean isEmpty() { return numEntries.get() == 0; @@ -861,6 +755,7 @@ public List getDimensions() /** * Returns the descriptor for a particular dimension. */ + @Override @Nullable public DimensionDesc getDimension(String dimension) { @@ -869,22 +764,39 @@ public DimensionDesc getDimension(String dimension) } } - public ColumnValueSelector makeMetricColumnValueSelector(String metric, IncrementalIndexRowHolder currEntry) + @Override + @Nullable + public MetricDesc getMetric(String metric) { - MetricDesc metricDesc = metricDescs.get(metric); + return metricDescs.get(metric); + } + + @Override + public List getOrdering() + { + return metadata.getOrdering(); + } + + public static ColumnValueSelector makeMetricColumnValueSelector( + IncrementalIndexRowSelector rowSelector, + IncrementalIndexRowHolder currEntry, + String metric + ) + { + final MetricDesc metricDesc = rowSelector.getMetric(metric); if (metricDesc == null) { return NilColumnValueSelector.instance(); } int metricIndex = metricDesc.getIndex(); switch (metricDesc.getCapabilities().getType()) { case COMPLEX: - return new ObjectMetricColumnSelector(metricDesc, currEntry, metricIndex); + return new ObjectMetricColumnSelector(rowSelector, currEntry, metricDesc); case LONG: - return new LongMetricColumnSelector(currEntry, metricIndex); + return new LongMetricColumnSelector(rowSelector, currEntry, metricIndex); case FLOAT: - return new FloatMetricColumnSelector(currEntry, metricIndex); + return new FloatMetricColumnSelector(rowSelector, currEntry, metricIndex); case DOUBLE: - return new DoubleMetricColumnSelector(currEntry, metricIndex); + return new DoubleMetricColumnSelector(rowSelector, currEntry, metricIndex); case STRING: throw new IllegalStateException("String is not a metric column type"); default: @@ -910,13 +822,6 @@ public DateTime getMaxTime() return isEmpty() ? null : DateTimes.utc(getMaxTimeMillis()); } - @Nullable - public Integer getDimensionIndex(String dimension) - { - DimensionDesc dimSpec = getDimension(dimension); - return dimSpec == null ? null : dimSpec.getIndex(); - } - /** * Returns names of time and dimension columns, in persist sort order. Includes {@link ColumnHolder#TIME_COLUMN_NAME}. */ @@ -1003,6 +908,49 @@ public Metadata getMetadata() return metadata; } + @Override + public Iterator iterator() + { + return iterableWithPostAggregations(null, false).iterator(); + } + + public DateTime getMaxIngestedEventTime() + { + return maxIngestedEventTime; + } + + protected ColumnSelectorFactory makeColumnSelectorFactory( + @Nullable final AggregatorFactory agg, + final InputRowHolder in + ) + { + return makeColumnSelectorFactory(virtualColumns, in, agg); + } + + protected final Comparator dimsComparator() + { + return new IncrementalIndexRowComparator(timePosition, dimensionDescsList); + } + + + private static String getSimplifiedEventStringFromRow(InputRow inputRow) + { + if (inputRow instanceof MapBasedInputRow) { + return ((MapBasedInputRow) inputRow).getEvent().toString(); + } + + if (inputRow instanceof ListBasedInputRow) { + return ((ListBasedInputRow) inputRow).asMap().toString(); + } + + if (inputRow instanceof TransformedInputRow) { + InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow(); + return getSimplifiedEventStringFromRow(innerRow); + } + + return inputRow.toString(); + } + private static AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators) { AggregatorFactory[] combiningAggregators = new AggregatorFactory[aggregators.length]; @@ -1012,30 +960,24 @@ private static AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] a return combiningAggregators; } - @Override - public Iterator iterator() - { - return iterableWithPostAggregations(null, false).iterator(); - } - - public abstract Iterable iterableWithPostAggregations( - @Nullable List postAggs, - boolean descending - ); - - public DateTime getMaxIngestedEventTime() + private static boolean allNull(Object[] dims, int startPosition) { - return maxIngestedEventTime; + for (int i = startPosition; i < dims.length; i++) { + if (dims[i] != null) { + return false; + } + } + return true; } public static final class DimensionDesc { private final int index; private final String name; - private final DimensionHandler handler; - private final DimensionIndexer indexer; + private final DimensionHandler handler; + private final DimensionIndexer indexer; - public DimensionDesc(int index, String name, DimensionHandler handler, boolean useMaxMemoryEstimates) + public DimensionDesc(int index, String name, DimensionHandler handler, boolean useMaxMemoryEstimates) { this.index = index; this.name = name; @@ -1058,12 +1000,12 @@ public ColumnCapabilities getCapabilities() return indexer.getColumnCapabilities(); } - public DimensionHandler getHandler() + public DimensionHandler getHandler() { return handler; } - public DimensionIndexer getIndexer() + public DimensionIndexer getIndexer() { return indexer; } @@ -1124,19 +1066,90 @@ public ColumnCapabilities getCapabilities() } } - protected ColumnSelectorFactory makeColumnSelectorFactory( - @Nullable final AggregatorFactory agg, - final InputRowHolder in - ) + public static class AddToFactsResult { - return makeColumnSelectorFactory(virtualColumns, in, agg); + private final int rowCount; + private final long bytesInMemory; + private final List parseExceptionMessages; + + public AddToFactsResult( + int rowCount, + long bytesInMemory, + List parseExceptionMessages + ) + { + this.rowCount = rowCount; + this.bytesInMemory = bytesInMemory; + this.parseExceptionMessages = parseExceptionMessages; + } + + int getRowCount() + { + return rowCount; + } + + public long getBytesInMemory() + { + return bytesInMemory; + } + + public List getParseExceptionMessages() + { + return parseExceptionMessages; + } } - protected final Comparator dimsComparator() + public static class InputRowHolder { - return new IncrementalIndexRowComparator(timePosition, dimensionDescsList); + @Nullable + private InputRow row; + private long rowId = -1; + + public void set(final InputRow row) + { + this.row = row; + this.rowId++; + } + + public void unset() + { + this.row = null; + } + + public InputRow getRow() + { + return Preconditions.checkNotNull(row, "row"); + } + + public long getRowId() + { + return rowId; + } } + static class IncrementalIndexRowResult + { + private final IncrementalIndexRow incrementalIndexRow; + private final List parseExceptionMessages; + + IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List parseExceptionMessages) + { + this.incrementalIndexRow = incrementalIndexRow; + this.parseExceptionMessages = parseExceptionMessages; + } + + IncrementalIndexRow getIncrementalIndexRow() + { + return incrementalIndexRow; + } + + List getParseExceptionMessages() + { + return parseExceptionMessages; + } + } + + @VisibleForTesting static final class IncrementalIndexRowComparator implements Comparator { @@ -1207,57 +1220,19 @@ public int compare(IncrementalIndexRow lhs, IncrementalIndexRow rhs) } } - private static boolean allNull(Object[] dims, int startPosition) - { - for (int i = startPosition; i < dims.length; i++) { - if (dims[i] != null) { - return false; - } - } - return true; - } - - public interface FactsHolder - { - /** - * @return the previous rowIndex associated with the specified key, or - * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key. - */ - int getPriorIndex(IncrementalIndexRow key); - - long getMinTimeMillis(); - - long getMaxTimeMillis(); - - Iterator iterator(boolean descending); - - Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd); - - Iterable keySet(); - - /** - * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator} - * - * @return - */ - Iterable persistIterable(); - - /** - * @return the previous rowIndex associated with the specified key, or - * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key. - */ - int putIfAbsent(IncrementalIndexRow key, int rowIndex); - - void clear(); - } - - private final class LongMetricColumnSelector implements LongColumnSelector + private static final class LongMetricColumnSelector implements LongColumnSelector { + private final IncrementalIndexRowSelector rowSelector; private final IncrementalIndexRowHolder currEntry; private final int metricIndex; - public LongMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricIndex) + public LongMetricColumnSelector( + IncrementalIndexRowSelector rowSelector, + IncrementalIndexRowHolder currEntry, + int metricIndex + ) { + this.rowSelector = rowSelector; this.currEntry = currEntry; this.metricIndex = metricIndex; } @@ -1265,119 +1240,131 @@ public LongMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricI @Override public long getLong() { - assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricLongValue(currEntry.get().getRowIndex(), metricIndex); + return rowSelector.getMetricLongValue(currEntry.get().getRowIndex(), metricIndex); } @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) + public boolean isNull() { - inspector.visit("index", IncrementalIndex.this); + return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex); } @Override - public boolean isNull() + public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + inspector.visit("index", rowSelector); } } - private final class ObjectMetricColumnSelector extends ObjectColumnSelector + private static final class FloatMetricColumnSelector implements FloatColumnSelector { + private final IncrementalIndexRowSelector rowSelector; private final IncrementalIndexRowHolder currEntry; private final int metricIndex; - private Class classOfObject; - public ObjectMetricColumnSelector( - MetricDesc metricDesc, + public FloatMetricColumnSelector( + IncrementalIndexRowSelector rowSelector, IncrementalIndexRowHolder currEntry, int metricIndex ) { this.currEntry = currEntry; + this.rowSelector = rowSelector; this.metricIndex = metricIndex; - classOfObject = ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz(); } - @Nullable @Override - public Object getObject() + public float getFloat() { - return getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex); + return rowSelector.getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex); } @Override - public Class classOfObject() + public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - return classOfObject; + inspector.visit("index", rowSelector); } @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) + public boolean isNull() { - inspector.visit("index", IncrementalIndex.this); + return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex); } } - private final class FloatMetricColumnSelector implements FloatColumnSelector + private static final class DoubleMetricColumnSelector implements DoubleColumnSelector { + private final IncrementalIndexRowSelector rowSelector; private final IncrementalIndexRowHolder currEntry; private final int metricIndex; - public FloatMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricIndex) + public DoubleMetricColumnSelector( + IncrementalIndexRowSelector rowSelector, + IncrementalIndexRowHolder currEntry, + int metricIndex + ) { this.currEntry = currEntry; + this.rowSelector = rowSelector; this.metricIndex = metricIndex; } @Override - public float getFloat() + public double getDouble() { assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex); + return rowSelector.getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex); } @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) + public boolean isNull() { - inspector.visit("index", IncrementalIndex.this); + return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex); } @Override - public boolean isNull() + public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + inspector.visit("index", rowSelector); } } - private final class DoubleMetricColumnSelector implements DoubleColumnSelector + private static final class ObjectMetricColumnSelector extends ObjectColumnSelector { + private final IncrementalIndexRowSelector rowSelector; private final IncrementalIndexRowHolder currEntry; private final int metricIndex; + private final Class classOfObject; - public DoubleMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricIndex) + public ObjectMetricColumnSelector( + IncrementalIndexRowSelector rowSelector, + IncrementalIndexRowHolder currEntry, + MetricDesc metricDesc + ) { this.currEntry = currEntry; - this.metricIndex = metricIndex; + this.rowSelector = rowSelector; + this.metricIndex = metricDesc.getIndex(); + this.classOfObject = ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz(); } + @Nullable @Override - public double getDouble() + public Object getObject() { - assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex); + return rowSelector.getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex); } @Override - public boolean isNull() + public Class classOfObject() { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + return classOfObject; } @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - inspector.visit("index", IncrementalIndex.this); + inspector.visit("index", rowSelector); } } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java index 86e8c6690c2d..9d60edef0449 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java @@ -43,29 +43,29 @@ class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory, RowIdSupplier { private final ColumnInspector snapshotColumnInspector; - private final IncrementalIndex index; private final VirtualColumns virtualColumns; private final Order timeOrder; private final IncrementalIndexRowHolder rowHolder; + private final IncrementalIndexRowSelector rowSelector; IncrementalIndexColumnSelectorFactory( - IncrementalIndex index, + IncrementalIndexRowSelector rowSelector, VirtualColumns virtualColumns, Order timeOrder, IncrementalIndexRowHolder rowHolder ) { - this.index = index; this.virtualColumns = virtualColumns; this.timeOrder = timeOrder; this.rowHolder = rowHolder; + this.rowSelector = rowSelector; this.snapshotColumnInspector = new ColumnInspector() { @Nullable @Override public ColumnCapabilities getColumnCapabilities(String column) { - return IncrementalIndexCursorFactory.snapshotColumnCapabilities(index, column); + return IncrementalIndexCursorFactory.snapshotColumnCapabilities(rowSelector, column); } }; } @@ -87,13 +87,13 @@ private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensi if (dimension.equals(ColumnHolder.TIME_COLUMN_NAME) && timeOrder != Order.NONE) { return new SingleScanTimeDimensionSelector( - makeColumnValueSelector(dimension), + makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), extractionFn, timeOrder ); } - final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension()); + final IncrementalIndex.DimensionDesc dimensionDesc = rowSelector.getDimension(dimensionSpec.getDimension()); if (dimensionDesc == null) { // not a dimension, column may be a metric ColumnCapabilities capabilities = getColumnCapabilities(dimension); @@ -122,19 +122,17 @@ public ColumnValueSelector makeColumnValueSelector(String columnName) if (virtualColumns.exists(columnName)) { return virtualColumns.makeColumnValueSelector(columnName, this); } - - if (columnName.equals(ColumnHolder.TIME_COLUMN_NAME)) { + if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) { return rowHolder; } - final Integer dimIndex = index.getDimensionIndex(columnName); - if (dimIndex != null) { - final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName); + final IncrementalIndex.DimensionDesc dimensionDesc = rowSelector.getDimension(columnName); + if (dimensionDesc != null) { final DimensionIndexer indexer = dimensionDesc.getIndexer(); return indexer.makeColumnValueSelector(rowHolder, dimensionDesc); } - return index.makeMetricColumnValueSelector(columnName, rowHolder); + return IncrementalIndex.makeMetricColumnValueSelector(rowSelector, rowHolder, columnName); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java index e034f820dfbf..b73a7b682a30 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java @@ -99,9 +99,9 @@ public ColumnCapabilities getColumnCapabilities(String column) return snapshotColumnCapabilities(index, column); } - static ColumnCapabilities snapshotColumnCapabilities(IncrementalIndex index, String column) + static ColumnCapabilities snapshotColumnCapabilities(IncrementalIndexRowSelector selector, String column) { - IncrementalIndex.DimensionDesc desc = index.getDimension(column); + IncrementalIndex.DimensionDesc desc = selector.getDimension(column); // nested column indexer is a liar, and behaves like any type if it only processes unnested literals of a single // type, so force it to use nested column type if (desc != null && desc.getIndexer() instanceof NestedDataColumnIndexerV4) { @@ -122,7 +122,7 @@ static ColumnCapabilities snapshotColumnCapabilities(IncrementalIndex index, Str // multi-valuedness at cursor creation time, instead of the latest state, and getSnapshotColumnCapabilities could // be removed. return ColumnCapabilitiesImpl.snapshot( - index.getColumnCapabilities(column), + selector.getColumnCapabilities(column), COERCE_LOGIC ); } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java index 02c09398d8e5..72ec9116d1f7 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java @@ -23,52 +23,46 @@ import org.apache.druid.query.BaseQuery; import org.apache.druid.query.Order; import org.apache.druid.query.OrderBy; -import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; import org.apache.druid.segment.CursorHolder; import org.apache.druid.segment.Cursors; -import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.filter.ValueMatchers; -import org.joda.time.Interval; -import javax.annotation.Nullable; -import java.util.Collections; import java.util.Iterator; import java.util.List; public class IncrementalIndexCursorHolder implements CursorHolder { - private final IncrementalIndex index; + private final IncrementalIndexRowSelector rowSelector; private final CursorBuildSpec spec; private final List ordering; public IncrementalIndexCursorHolder( - IncrementalIndex index, + IncrementalIndexRowSelector rowSelector, CursorBuildSpec spec ) { - this.index = index; + this.rowSelector = rowSelector; this.spec = spec; - if (index.timePosition == 0) { + List ordering = rowSelector.getOrdering(); + if (Cursors.getTimeOrdering(ordering) != Order.NONE) { if (Cursors.preferDescendingTimeOrdering(spec)) { this.ordering = Cursors.descendingTimeOrder(); } else { this.ordering = Cursors.ascendingTimeOrder(); } } else { - // In principle, we could report a sort order here for certain types of fact holders; for example the - // RollupFactsHolder would be sorted by dimensions. However, this is left for future work. - this.ordering = Collections.emptyList(); + this.ordering = ordering; } } @Override public Cursor asCursor() { - if (index.isEmpty()) { + if (rowSelector.isEmpty()) { return null; } @@ -76,13 +70,10 @@ public Cursor asCursor() spec.getQueryMetrics().vectorized(false); } - return new IncrementalIndexCursor( - index, - spec.getVirtualColumns(), - Cursors.getTimeOrdering(ordering), - spec.getFilter(), - spec.getInterval() + rowSelector, + spec, + Cursors.getTimeOrdering(ordering) ); } @@ -94,11 +85,11 @@ public List getOrdering() static class IncrementalIndexCursor implements Cursor { - private IncrementalIndexRowHolder currEntry; + private final IncrementalIndexRowSelector rowSelector; + private final IncrementalIndexRowHolder currEntry; private final ColumnSelectorFactory columnSelectorFactory; private final ValueMatcher filterMatcher; private final int maxRowIndex; - private final IncrementalIndex.FactsHolder facts; private Iterator baseIter; private Iterable cursorIterable; private boolean emptyRange; @@ -106,30 +97,31 @@ static class IncrementalIndexCursor implements Cursor private boolean done; IncrementalIndexCursor( - IncrementalIndex index, - VirtualColumns virtualColumns, - Order timeOrder, - @Nullable Filter filter, - Interval actualInterval + IncrementalIndexRowSelector index, + CursorBuildSpec buildSpec, + Order timeOrder ) { currEntry = new IncrementalIndexRowHolder(); - columnSelectorFactory = new IncrementalIndexColumnSelectorFactory( - index, - virtualColumns, - timeOrder, - currEntry - ); // Set maxRowIndex before creating the filterMatcher. See https://github.com/apache/druid/pull/6340 maxRowIndex = index.getLastRowIndex(); - filterMatcher = filter == null ? ValueMatchers.allTrue() : filter.makeMatcher(columnSelectorFactory); numAdvanced = -1; - facts = index.getFacts(); - cursorIterable = facts.timeRangeIterable( + + rowSelector = index; + cursorIterable = rowSelector.getFacts().timeRangeIterable( timeOrder == Order.DESCENDING, - actualInterval.getStartMillis(), - actualInterval.getEndMillis() + buildSpec.getInterval().getStartMillis(), + buildSpec.getInterval().getEndMillis() ); + columnSelectorFactory = new IncrementalIndexColumnSelectorFactory( + rowSelector, + buildSpec.getVirtualColumns(), + timeOrder, + currEntry + ); + filterMatcher = buildSpec.getFilter() == null + ? ValueMatchers.allTrue() + : buildSpec.getFilter().makeMatcher(columnSelectorFactory); emptyRange = !cursorIterable.iterator().hasNext(); reset(); @@ -152,7 +144,7 @@ public void advance() while (baseIter.hasNext()) { BaseQuery.checkInterrupted(); - IncrementalIndexRow entry = baseIter.next(); + final IncrementalIndexRow entry = baseIter.next(); if (beyondMaxRowIndex(entry.getRowIndex())) { continue; } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java index 89e94961f6b2..2e817b993ce0 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java @@ -144,6 +144,8 @@ public Object apply(@Nullable Object input) { if (input == null || (input.getClass().isArray() && Array.getLength(input) == 0)) { return Collections.singletonList("null"); + } else if (input instanceof int[]) { + return Arrays.toString((int[]) input); } return Collections.singletonList(input); } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java new file mode 100644 index 000000000000..bafa127e881e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import org.apache.druid.query.OrderBy; +import org.apache.druid.segment.ColumnInspector; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * Interface that abstracts selecting data from a {@link FactsHolder} + */ +public interface IncrementalIndexRowSelector extends ColumnInspector +{ + /** + * get {@link IncrementalIndex.DimensionDesc} for the specified column, if available, which provides access to things + * like {@link org.apache.druid.segment.DimensionIndexer} and {@link org.apache.druid.segment.DimensionHandler} as + * well as column capabilities and position within the row + */ + @Nullable + IncrementalIndex.DimensionDesc getDimension(String columnName); + + /** + * Get {@link IncrementalIndex.MetricDesc} which provides column capabilities and position in the aggregators section + * of the row + */ + @Nullable + IncrementalIndex.MetricDesc getMetric(String s); + + /** + * Ordering for the data in the facts table + */ + List getOrdering(); + + /** + * Are there any {@link IncrementalIndexRow} stored in the {@link FactsHolder}? + */ + boolean isEmpty(); + + /** + * Get the {@link FactsHolder} containing all of the {@link IncrementalIndexRow} backing this selector + */ + FactsHolder getFacts(); + + /** + * Highest value {@link IncrementalIndexRow#getRowIndex()} available in this selector. Note that these values do not + * reflect the position of the row in the {@link FactsHolder}, rather just the order in which they were processed + */ + int getLastRowIndex(); + + /** + * @param rowOffset row to get float aggregator value + * @param aggOffset position of the aggregator in the aggregators array of the data schema + * @return float value of the metric + */ + float getMetricFloatValue(int rowOffset, int aggOffset); + + /** + * @param rowOffset row to get long aggregator value + * @param aggOffset position of the aggregator in the aggregators array of the data schema + * @return long value of the aggregator for this row + */ + long getMetricLongValue(int rowOffset, int aggOffset); + + /** + * @param rowOffset row to get double aggregator value + * @param aggOffset position of the aggregator in the aggregators array of the data schema + * @return double value of the aggregator for this row + */ + double getMetricDoubleValue(int rowOffset, int aggOffset); + + /** + * @param rowOffset row to get long aggregator value + * @param aggOffset position of the aggregator in the aggregators array of the data schema + * @return long value of the aggregator for this row + */ + @Nullable + Object getMetricObjectValue(int rowOffset, int aggOffset); + + /** + * @param rowOffset row to check for a aggregator value + * @param aggOffset position of the aggregator in the aggregators array of the data schema + * @return is the value null for this row? + */ + boolean isNull(int rowOffset, int aggOffset); +} diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index b5e580f44f2f..8c554e016fc4 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -155,7 +155,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } else { this.facts = new PlainNonTimeOrderedFactsHolder(dimsComparator()); } - maxBytesPerRowForAggregators = + this.maxBytesPerRowForAggregators = useMaxMemoryEstimates ? getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0; this.useMaxMemoryEstimates = useMaxMemoryEstimates; } @@ -252,14 +252,15 @@ protected AddToFactsResult addToFacts( ) throws IndexSizeExceededException { final List parseExceptionMessages = new ArrayList<>(); + final AtomicLong totalSizeInBytes = getBytesInMemory(); + final int priorIndex = facts.getPriorIndex(key); Aggregator[] aggs; final AggregatorFactory[] metrics = getMetrics(); final AtomicInteger numEntries = getNumEntries(); - final AtomicLong totalSizeInBytes = getBytesInMemory(); if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) { - aggs = concurrentGet(priorIndex); + aggs = aggregators.get(priorIndex); long aggSizeDelta = doAggregate(metrics, aggs, inputRowHolder, parseExceptionMessages); totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta); } else { @@ -272,7 +273,7 @@ protected AddToFactsResult addToFacts( aggSizeForRow += doAggregate(metrics, aggs, inputRowHolder, parseExceptionMessages); final int rowIndex = indexIncrement.getAndIncrement(); - concurrentSet(rowIndex, aggs); + aggregators.put(rowIndex, aggs); // Last ditch sanity checks if ((numEntries.get() >= maxRowCount || totalSizeInBytes.get() >= maxBytesInMemory) @@ -363,6 +364,18 @@ private long doAggregate( InputRowHolder inputRowHolder, List parseExceptionsHolder ) + { + return doAggregate(metrics, aggs, inputRowHolder, parseExceptionsHolder, useMaxMemoryEstimates, preserveExistingMetrics); + } + + private static long doAggregate( + AggregatorFactory[] metrics, + Aggregator[] aggs, + InputRowHolder inputRowHolder, + List parseExceptionsHolder, + boolean useMaxMemoryEstimates, + boolean preserveExistingMetrics + ) { long totalIncrementalBytes = 0L; for (int i = 0; i < metrics.length; i++) { @@ -418,17 +431,6 @@ private void closeAggregators() } } - protected Aggregator[] concurrentGet(int offset) - { - // All get operations should be fine - return aggregators.get(offset); - } - - protected void concurrentSet(int offset, Aggregator[] value) - { - aggregators.put(offset, value); - } - @Override public boolean canAppendRow() { @@ -459,42 +461,53 @@ public String getOutOfRowsReason() return outOfRowsReason; } - protected Aggregator[] getAggsForRow(int rowOffset) - { - return concurrentGet(rowOffset); - } - @Override public float getMetricFloatValue(int rowOffset, int aggOffset) { - return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue(); + return ((Number) getMetricHelper( + getMetricAggs(), + aggregators.get(rowOffset), + aggOffset, + Aggregator::getFloat + )).floatValue(); } @Override public long getMetricLongValue(int rowOffset, int aggOffset) { - return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue(); + return ((Number) getMetricHelper( + getMetricAggs(), + aggregators.get(rowOffset), + aggOffset, + Aggregator::getLong + )).longValue(); } @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) + public double getMetricDoubleValue(int rowOffset, int aggOffset) { - return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::get); + return ((Number) getMetricHelper( + getMetricAggs(), + aggregators.get(rowOffset), + aggOffset, + Aggregator::getDouble + )).doubleValue(); } @Override - protected double getMetricDoubleValue(int rowOffset, int aggOffset) + public Object getMetricObjectValue(int rowOffset, int aggOffset) { - return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue(); + return getMetricHelper(getMetricAggs(), aggregators.get(rowOffset), aggOffset, Aggregator::get); } @Override public boolean isNull(int rowOffset, int aggOffset) { + final Aggregator[] aggs = aggregators.get(rowOffset); if (preserveExistingMetrics) { - return concurrentGet(rowOffset)[aggOffset].isNull() && concurrentGet(rowOffset)[aggOffset + getMetricAggs().length].isNull(); + return aggs[aggOffset].isNull() && aggs[aggOffset + getMetricAggs().length].isNull(); } else { - return concurrentGet(rowOffset)[aggOffset].isNull(); + return aggs[aggOffset].isNull(); } } @@ -535,7 +548,7 @@ public Iterable iterableWithPostAggregations( theVals.put(dimensionName, rowVals); } - Aggregator[] aggs = getAggsForRow(rowOffset); + Aggregator[] aggs = aggregators.get(rowOffset); int aggLength = preserveExistingMetrics ? aggs.length / 2 : aggs.length; for (int i = 0; i < aggLength; ++i) { theVals.put(metrics[i].getName(), getMetricHelper(metrics, aggs, i, Aggregator::get)); @@ -560,11 +573,16 @@ public Iterable iterableWithPostAggregations( * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed */ @Nullable - private Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function getMetricTypeFunction) + private Object getMetricHelper( + AggregatorFactory[] metrics, + Aggregator[] aggs, + int aggOffset, + Function getMetricTypeFunction + ) { if (preserveExistingMetrics) { - // Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values - // from two aggregators, the aggregator for aggregating from input into output field and the aggregator + // Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated + // values from two aggregators, the aggregator for aggregating from input into output field and the aggregator // for combining already aggregated field if (aggs[aggOffset].isNull()) { // If the aggregator for aggregating from input into output field is null, then we get the value from the @@ -583,8 +601,8 @@ private Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] agg return aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined); } } else { - // If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the - // given aggOffset + // If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, + // using the given aggOffset return getMetricTypeFunction.apply(aggs[aggOffset]); } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java index ae748198aea3..baac335f0c42 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java @@ -26,10 +26,7 @@ import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.testutil.FrameSequenceBuilder; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.CursorFactory; -import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -48,15 +45,11 @@ public static CursorFactory toCursorFactory(List inputRows) { final IncrementalIndex index = new OnheapIncrementalIndex.Builder() .setIndexSchema( - new IncrementalIndexSchema( - 0, - new TimestampSpec("__time", "millis", null), - Granularities.NONE, - VirtualColumns.EMPTY, - DimensionsSpec.builder().useSchemaDiscovery(true).build(), - new AggregatorFactory[0], - false - ) + IncrementalIndexSchema.builder() + .withTimestampSpec(new TimestampSpec("__time", "millis", null)) + .withDimensionsSpec(DimensionsSpec.builder().useSchemaDiscovery(true).build()) + .withRollup(false) + .build() ) .setMaxRowCount(1000) .build(); diff --git a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java index 80b6d23d4b85..2c570981f656 100644 --- a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java @@ -26,8 +26,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.BuiltInTypesModule; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.column.ColumnType; @@ -494,18 +492,17 @@ public void testNestedColumnIndexerSchemaDiscoveryTypeCoercion() throws IndexSiz long minTimestamp = System.currentTimeMillis(); IncrementalIndex index = new OnheapIncrementalIndex.Builder() .setIndexSchema( - new IncrementalIndexSchema( - minTimestamp, - new TimestampSpec(TIME_COL, "millis", null), - Granularities.NONE, - VirtualColumns.EMPTY, - DimensionsSpec.builder() - .setDimensions(ImmutableList.of(new AutoTypeColumnSchema(NESTED_COL, ColumnType.STRING))) - .useSchemaDiscovery(true) - .build(), - new AggregatorFactory[0], - false - ) + IncrementalIndexSchema.builder() + .withMinTimestamp(minTimestamp) + .withTimestampSpec(new TimestampSpec(TIME_COL, "millis", null)) + .withDimensionsSpec( + DimensionsSpec.builder() + .setDimensions(ImmutableList.of(new AutoTypeColumnSchema(NESTED_COL, ColumnType.STRING))) + .useSchemaDiscovery(true) + .build() + ) + .withRollup(false) + .build() ) .setMaxRowCount(1000) .build(); @@ -699,15 +696,16 @@ private static IncrementalIndex makeIncrementalIndex(long minTimestamp) { IncrementalIndex index = new OnheapIncrementalIndex.Builder() .setIndexSchema( - new IncrementalIndexSchema( - minTimestamp, - new TimestampSpec(TIME_COL, "millis", null), - Granularities.NONE, - VirtualColumns.EMPTY, - DimensionsSpec.builder().useSchemaDiscovery(true).build(), - new AggregatorFactory[0], - false - ) + IncrementalIndexSchema.builder() + .withMinTimestamp(minTimestamp) + .withTimestampSpec(new TimestampSpec(TIME_COL, "millis", null)) + .withDimensionsSpec( + DimensionsSpec.builder() + .useSchemaDiscovery(true) + .build() + ) + .withRollup(false) + .build() ) .setMaxRowCount(1000) .build(); diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java index 2e9deab42b49..9fc9fc0f578d 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java @@ -26,8 +26,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.BuiltInTypesModule; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.column.ColumnType; @@ -478,15 +476,16 @@ private static IncrementalIndex makeIncrementalIndex(long minTimestamp) { IncrementalIndex index = new OnheapIncrementalIndex.Builder() .setIndexSchema( - new IncrementalIndexSchema( - minTimestamp, - new TimestampSpec(TIME_COL, "millis", null), - Granularities.NONE, - VirtualColumns.EMPTY, - DimensionsSpec.builder().useSchemaDiscovery(true).build(), - new AggregatorFactory[0], - false - ) + IncrementalIndexSchema.builder() + .withMinTimestamp(minTimestamp) + .withTimestampSpec(new TimestampSpec(TIME_COL, "millis", null)) + .withDimensionsSpec( + DimensionsSpec.builder() + .useSchemaDiscovery(true) + .build() + ) + .withRollup(false) + .build() ) .setMaxRowCount(1000) .build(); diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java index 77e0470c5486..e1a7319cab18 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java @@ -25,9 +25,12 @@ import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorAndSize; import org.apache.druid.query.aggregation.LongMaxAggregator; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.segment.CloserRule; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Rule; @@ -69,22 +72,39 @@ public void testOnHeapIncrementalIndexClose() throws Exception { // Prepare the mocks & set close() call count expectation to 1 Aggregator mockedAggregator = EasyMock.createMock(LongMaxAggregator.class); + EasyMock.expect(mockedAggregator.aggregateWithSize()).andReturn(0L).anyTimes(); mockedAggregator.close(); EasyMock.expectLastCall().times(1); - final IncrementalIndex genericIndex = indexCreator.createIndex( + + EasyMock.replay(mockedAggregator); + + final IncrementalIndex incrementalIndex = indexCreator.createIndex( new IncrementalIndexSchema.Builder() .withQueryGranularity(Granularities.MINUTE) - .withMetrics(new LongMaxAggregatorFactory("max", "max")) + .withMetrics(new LongMaxAggregatorFactory("max", "max") + { + @Override + protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) + { + return mockedAggregator; + } + + @Override + public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory) + { + return new AggregatorAndSize(mockedAggregator, Long.BYTES); + } + }) .build() ); // This test is specific to the on-heap index - if (!(genericIndex instanceof OnheapIncrementalIndex)) { + if (!(incrementalIndex instanceof OnheapIncrementalIndex)) { return; } - final OnheapIncrementalIndex index = (OnheapIncrementalIndex) genericIndex; + final OnheapIncrementalIndex index = (OnheapIncrementalIndex) incrementalIndex; index.add(new MapBasedInputRow( 0, @@ -92,11 +112,7 @@ public void testOnHeapIncrementalIndexClose() throws Exception ImmutableMap.of("billy", 1, "max", 1) )); - // override the aggregators with the mocks - index.concurrentGet(0)[0] = mockedAggregator; - // close the indexer and validate the expectations - EasyMock.replay(mockedAggregator); index.close(); EasyMock.verify(mockedAggregator); } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java index 80c8207ed605..f5779bf73629 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java @@ -28,10 +28,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.BuiltInTypesModule; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.CloserRule; -import org.apache.druid.segment.VirtualColumns; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Rule; @@ -80,15 +77,11 @@ public void test() throws IndexSizeExceededException new StringDimensionSchema("string3", DimensionSchema.MultiValueHandling.SORTED_SET, true) ) ); - IncrementalIndexSchema schema = new IncrementalIndexSchema( - 0, - new TimestampSpec("ds", "auto", null), - Granularities.ALL, - VirtualColumns.EMPTY, - dimensionsSpec, - new AggregatorFactory[0], - false - ); + IncrementalIndexSchema schema = IncrementalIndexSchema.builder() + .withTimestampSpec(new TimestampSpec("ds", "auto", null)) + .withDimensionsSpec(dimensionsSpec) + .withRollup(false) + .build(); Map map = new HashMap() { @Override diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java index 632a848830d8..a3060f078a2c 100644 --- a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java +++ b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java @@ -25,7 +25,6 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.data.input.MapBasedInputRow; -import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.Granularities; @@ -50,7 +49,6 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexCursorFactory; import org.apache.druid.segment.TestObjectColumnSelector; -import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnType; @@ -620,15 +618,10 @@ public void test_incrementalIndexStringSelector() throws IndexSizeExceededExcept // underlying dimension selector. // This occurred during schemaless ingestion with spare dimension values and no explicit null rows, so the // conditions are replicated by this test. See https://github.com/apache/druid/pull/10248 for details - IncrementalIndexSchema schema = new IncrementalIndexSchema( - 0, - new TimestampSpec("time", "millis", DateTimes.nowUtc()), - Granularities.NONE, - VirtualColumns.EMPTY, - DimensionsSpec.EMPTY, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - true - ); + IncrementalIndexSchema schema = IncrementalIndexSchema.builder() + .withTimestampSpec(new TimestampSpec("time", "millis", DateTimes.nowUtc())) + .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .build(); IncrementalIndex index = new OnheapIncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).build(); index.add(