From dbed1b0f505b1d8258906b5738db4390e9caffd8 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 26 Jun 2024 17:28:36 -0700 Subject: [PATCH] Defer more expressions in vectorized groupBy. (#16338) * Defer more expressions in vectorized groupBy. This patch adds a way for columns to provide GroupByVectorColumnSelectors, which controls how the groupBy engine operates on them. This mechanism is used by ExpressionVirtualColumn to provide an ExpressionDeferredGroupByVectorColumnSelector that uses the inputs of an expression as the grouping key. The actual expression evaluation is deferred until the grouped ResultRow is created. A new context parameter "deferExpressionDimensions" allows users to control when this deferred selector is used. The default is "fixedWidthNonNumeric", which is a behavioral change from the prior behavior. Users can get the prior behavior by setting this to "singleString". * Fix style. * Add deferExpressionDimensions to SqlExpressionBenchmark. * Fix style. * Fix inspections. * Add more testing. * Use valueOrDefault. * Compute exprKeyBytes a bit lighter-weight. --- .../query/SqlExpressionBenchmark.java | 12 +- docs/querying/groupbyquery.md | 2 +- .../groupby/DeferExpressionDimensions.java | 197 ++++++++++++++++++ .../query/groupby/GroupByQueryConfig.java | 17 ++ .../vector/VectorGroupByEngine.java | 17 +- .../apache/druid/segment/VirtualColumn.java | 25 ++- ...yableIndexVectorColumnSelectorFactory.java | 31 +++ .../vector/VectorColumnSelectorFactory.java | 32 ++- ...onDeferredGroupByVectorColumnSelector.java | 123 +++++++++++ .../virtual/ExpressionVectorSelectors.java | 50 +++++ .../virtual/ExpressionVirtualColumn.java | 24 +++ .../query/groupby/GroupByQueryConfigTest.java | 6 + .../query/groupby/GroupByQueryRunnerTest.java | 34 +++ .../ExpressionVectorSelectorsTest.java | 38 +++- 14 files changed, 593 insertions(+), 15 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/DeferExpressionDimensions.java create mode 100644 processing/src/main/java/org/apache/druid/segment/virtual/ExpressionDeferredGroupByVectorColumnSelector.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index d4c2e0906b4f..8b2172182a07 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -37,6 +37,7 @@ import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.segment.AutoTypeColumnSchema; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; @@ -236,6 +237,14 @@ public String getFormatString() }) private String schema; + @Param({ + "singleString", + "fixedWidth", + "fixedWidthNonNumeric", + "always" + }) + private String deferExpressionDimensions; + @Param({ // non-expression reference "0", @@ -414,7 +423,8 @@ public void querySql(Blackhole blackhole) { final Map context = ImmutableMap.of( QueryContexts.VECTORIZE_KEY, vectorize, - QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize + QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize, + GroupByQueryConfig.CTX_KEY_DEFER_EXPRESSION_DIMENSIONS, deferExpressionDimensions ); final String sql = QUERIES.get(Integer.parseInt(query)); try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, context)) { diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md index 935bd90a145b..a11f82d124ad 100644 --- a/docs/querying/groupbyquery.md +++ b/docs/querying/groupbyquery.md @@ -385,7 +385,7 @@ Supported query contexts: |`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false| |`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan. This context value can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true| |`groupByEnableMultiValueUnnesting`|Safety flag to enable/disable the implicit unnesting on multi value column's as part of the grouping key. 'true' indicates multi-value grouping keys are unnested. 'false' returns an error if a multi value column is found as part of the grouping key.|true| - +|`deferExpressionDimensions`|When an entry in `dimensions` references an `expression` virtual column, this property influences whether expression evaluation is deferred from cursor processing to the merge step. Options are:
These properties only take effect when the `groupBy` query can be vectorized. Non-vectorized queries only defer string-typed expressions of single string inputs.|`fixedWidthNonNumeric`| #### Array based result rows diff --git a/processing/src/main/java/org/apache/druid/query/groupby/DeferExpressionDimensions.java b/processing/src/main/java/org/apache/druid/query/groupby/DeferExpressionDimensions.java new file mode 100644 index 000000000000..3c6621d90387 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/DeferExpressionDimensions.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.math.expr.ExprType; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.virtual.ExpressionPlan; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; + +import java.util.List; + +/** + * Controls deferral of {@link ExpressionVirtualColumn} in {@link GroupByQuery}. + */ +public enum DeferExpressionDimensions +{ + SINGLE_STRING("singleString") { + @Override + public boolean useDeferredGroupBySelector( + ExpressionPlan plan, + List requiredBindingsList, + ColumnInspector inspector + ) + { + return false; + } + }, + + /** + * Defer expressions when their input variables are all fixed-width types (primitive numbers, or dictionary encoded). + */ + FIXED_WIDTH("fixedWidth") { + @Override + public boolean useDeferredGroupBySelector( + ExpressionPlan plan, + List requiredBindingsList, + ColumnInspector inspector + ) + { + if (isInnatelyDeferrable(plan, requiredBindingsList, inspector)) { + return false; + } + + for (final String requiredBinding : requiredBindingsList) { + final ColumnCapabilities capabilities = inspector.getColumnCapabilities(requiredBinding); + if (capabilities == null) { + return false; + } + + if (!capabilities.isNumeric() && !capabilities.isDictionaryEncoded().isTrue()) { + // Not fixed-width. + return false; + } + } + + return true; + } + }, + + /** + * Defer expressions when their input variables are all fixed-width types (primitive numbers, or dictionary encoded). + */ + FIXED_WIDTH_NON_NUMERIC("fixedWidthNonNumeric") { + @Override + public boolean useDeferredGroupBySelector( + ExpressionPlan plan, + List requiredBindingsList, + ColumnInspector inspector + ) + { + if (isInnatelyDeferrable(plan, requiredBindingsList, inspector)) { + return false; + } + + boolean allNumericInputs = true; + + for (final String requiredBinding : requiredBindingsList) { + final ColumnCapabilities capabilities = inspector.getColumnCapabilities(requiredBinding); + if (capabilities == null) { + return false; + } + + allNumericInputs = allNumericInputs && capabilities.isNumeric(); + + if (!capabilities.isNumeric() && !capabilities.isDictionaryEncoded().isTrue()) { + // Not fixed-width. + return false; + } + } + + return !allNumericInputs || (plan.getOutputType() != null && !plan.getOutputType().isNumeric()); + } + }, + + ALWAYS("always") { + @Override + public boolean useDeferredGroupBySelector( + ExpressionPlan plan, + List requiredBindingsList, + ColumnInspector inspector + ) + { + return !isInnatelyDeferrable(plan, requiredBindingsList, inspector); + } + }; + + public static final String JSON_KEY = "deferExpressionDimensions"; + + private final String jsonName; + + DeferExpressionDimensions(String jsonName) + { + this.jsonName = jsonName; + } + + @JsonCreator + public static DeferExpressionDimensions fromString(final String jsonName) + { + for (final DeferExpressionDimensions value : values()) { + if (value.jsonName.equals(jsonName)) { + return value; + } + } + + throw new IAE("Invalid value[%s] for[%s]", jsonName, JSON_KEY); + } + + public abstract boolean useDeferredGroupBySelector( + ExpressionPlan plan, + List requiredBindingsList, + ColumnInspector inspector + ); + + @Override + @JsonValue + public String toString() + { + return jsonName; + } + + /** + * Whether the given expression can be deferred innately by the selector created by + * {@link ExpressionVirtualColumn#makeSingleValueVectorDimensionSelector(DimensionSpec, VectorColumnSelectorFactory)}. + * + * In this case, all options for this enum return false from + * {@link #useDeferredGroupBySelector(ExpressionPlan, List, ColumnInspector)}, because there is no need to defer + * redundantly. + */ + private static boolean isInnatelyDeferrable( + ExpressionPlan plan, + List requiredBindingsList, + ColumnInspector inspector + ) + { + if (plan.getOutputType() != null + && plan.getOutputType().is(ExprType.STRING) + && requiredBindingsList.size() <= 1) { + for (final String requiredBinding : requiredBindingsList) { + final ColumnCapabilities requiredBindingCapabilities = inspector.getColumnCapabilities(requiredBinding); + + if (requiredBindingCapabilities == null + || !requiredBindingCapabilities.is(ValueType.STRING) + || !requiredBindingCapabilities.isDictionaryEncoded().isTrue()) { + return false; + } + } + + return true; + } else { + return false; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 2fb4dfdd4d2a..9950695f28ce 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -27,6 +27,8 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.utils.JvmUtils; +import java.util.Optional; + /** * */ @@ -44,6 +46,7 @@ public class GroupByQueryConfig public static final String CTX_KEY_ARRAY_RESULT_ROWS = "resultAsArray"; public static final String CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING = "groupByEnableMultiValueUnnesting"; public static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; + public static final String CTX_KEY_DEFER_EXPRESSION_DIMENSIONS = "deferExpressionDimensions"; private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets"; private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor"; @@ -119,6 +122,9 @@ public class GroupByQueryConfig @JsonProperty private boolean mergeThreadLocal = false; + @JsonProperty + private DeferExpressionDimensions deferExpressionDimensions = DeferExpressionDimensions.FIXED_WIDTH_NON_NUMERIC; + @JsonProperty private boolean vectorize = true; @@ -277,6 +283,11 @@ public boolean isMergeThreadLocal() return mergeThreadLocal; } + public DeferExpressionDimensions getDeferExpressionDimensions() + { + return deferExpressionDimensions; + } + public boolean isVectorize() { return vectorize; @@ -350,6 +361,10 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query) getNumParallelCombineThreads() ); newConfig.mergeThreadLocal = queryContext.getBoolean(CTX_KEY_MERGE_THREAD_LOCAL, isMergeThreadLocal()); + newConfig.deferExpressionDimensions = + Optional.ofNullable(queryContext.getString(CTX_KEY_DEFER_EXPRESSION_DIMENSIONS)) + .map(DeferExpressionDimensions::fromString) + .orElse(getDeferExpressionDimensions()); newConfig.vectorize = queryContext.getBoolean(QueryContexts.VECTORIZE_KEY, isVectorize()); newConfig.enableMultiValueUnnesting = queryContext.getBoolean( CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, @@ -378,6 +393,8 @@ public String toString() ", vectorize=" + vectorize + ", forcePushDownNestedQuery=" + forcePushDownNestedQuery + ", enableMultiValueUnnesting=" + enableMultiValueUnnesting + + ", mergeThreadLocal=" + mergeThreadLocal + + ", deferExpressionDimensions=" + deferExpressionDimensions + '}'; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index 7b28f782e31d..93633644a2ef 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.aggregation.AggregatorAdapters; +import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.groupby.GroupByQuery; @@ -137,12 +138,22 @@ public void close() try { final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); final List dimensions = query.getDimensions().stream().map( - dimensionSpec -> - ColumnProcessors.makeVectorProcessor( + dimensionSpec -> { + if (dimensionSpec instanceof DefaultDimensionSpec) { + // Delegate creation of GroupByVectorColumnSelector to the column selector factory, so that + // virtual columns (like ExpressionVirtualColumn) can control their own grouping behavior. + return columnSelectorFactory.makeGroupByVectorColumnSelector( + dimensionSpec.getDimension(), + config.getDeferExpressionDimensions() + ); + } else { + return ColumnProcessors.makeVectorProcessor( dimensionSpec, GroupByVectorColumnProcessorFactory.instance(), columnSelectorFactory - ) + ); + } + } ).collect(Collectors.toList()); return new VectorGroupByEngineIterator( diff --git a/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java b/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java index 3698a8a731bf..ca9408d14e4b 100644 --- a/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/VirtualColumn.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.Cacheable; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.ColumnIndexSelector; +import org.apache.druid.query.groupby.DeferExpressionDimensions; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.data.ReadableOffset; @@ -240,6 +242,26 @@ default VectorObjectSelector makeVectorObjectSelector( return null; } + /** + * Returns a group-by selector. Allows virtual columns to control their own grouping behavior. + * + * @param columnName column name + * @param factory column selector factory + * @param deferExpressionDimensions active value of {@link org.apache.druid.query.groupby.GroupByQueryConfig#CTX_KEY_DEFER_EXPRESSION_DIMENSIONS} + * + * @return selector, or null if this virtual column does not have a specialized one + */ + @SuppressWarnings("unused") + @Nullable + default GroupByVectorColumnSelector makeGroupByVectorColumnSelector( + String columnName, + VectorColumnSelectorFactory factory, + DeferExpressionDimensions deferExpressionDimensions + ) + { + return null; + } + /** * This method is deprecated in favor of {@link #capabilities(ColumnInspector, String)}, which should be used whenever * possible and can support virtual column implementations that need to inspect other columns as inputs. @@ -265,8 +287,9 @@ default VectorObjectSelector makeVectorObjectSelector( * Examples of this include the {@link ExpressionVirtualColumn}, which takes input from other columns and uses the * {@link ColumnInspector} to infer the output type of expressions based on the types of the inputs. * - * @param inspector column inspector to provide additional information of other available columns + * @param inspector column inspector to provide additional information of other available columns * @param columnName the name this virtual column was referenced with + * * @return capabilities, must not be null */ @Nullable diff --git a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java index 79d31ee479d8..3e5bedb5cdc0 100644 --- a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java @@ -21,8 +21,13 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.DeferExpressionDimensions; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnProcessorFactory; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector; import org.apache.druid.segment.ColumnCache; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.BaseColumn; import org.apache.druid.segment.column.ColumnCapabilities; @@ -247,6 +252,32 @@ public VectorObjectSelector makeObjectSelector(final String columnName) return columnValueSelector; } + @Override + public GroupByVectorColumnSelector makeGroupByVectorColumnSelector( + String column, + DeferExpressionDimensions deferExpressionDimensions + ) + { + GroupByVectorColumnSelector retVal = null; + + // Allow virtual columns to control their own grouping behavior. + final VirtualColumn virtualColumn = virtualColumns.getVirtualColumn(column); + if (virtualColumn != null) { + retVal = virtualColumn.makeGroupByVectorColumnSelector(column, this, deferExpressionDimensions); + } + + // Generic case: use GroupByVectorColumnProcessorFactory.instance() to build selectors for primitive types. + if (retVal == null) { + retVal = ColumnProcessors.makeVectorProcessor( + column, + GroupByVectorColumnProcessorFactory.instance(), + this + ); + } + + return retVal; + } + @Nullable @Override public ColumnCapabilities getColumnCapabilities(final String columnName) diff --git a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java index 56b14898ac61..de129d564bfd 100644 --- a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java @@ -20,7 +20,11 @@ package org.apache.druid.segment.vector; import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.DeferExpressionDimensions; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnProcessorFactory; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector; import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.column.ColumnCapabilities; import javax.annotation.Nullable; @@ -30,7 +34,7 @@ * * If you need to write code that adapts to different input types, you should write a * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the - * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this class. + * {@link ColumnProcessors#makeVectorProcessor} functions instead of using this class. * * @see org.apache.druid.segment.ColumnSelectorFactory the non-vectorized version. */ @@ -61,7 +65,7 @@ default int getMaxVectorSize() * * If you need to write code that adapts to different input types, you should write a * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the - * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. + * {@link ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec); @@ -72,7 +76,7 @@ default int getMaxVectorSize() * * If you need to write code that adapts to different input types, you should write a * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the - * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. + * {@link ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec); @@ -82,7 +86,7 @@ default int getMaxVectorSize() * * If you need to write code that adapts to different input types, you should write a * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the - * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. + * {@link ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ VectorValueSelector makeValueSelector(String column); @@ -97,7 +101,7 @@ default int getMaxVectorSize() * * If you need to write code that adapts to different input types, you should write a * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the - * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. + * {@link ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ VectorObjectSelector makeObjectSelector(String column); @@ -110,4 +114,22 @@ default int getMaxVectorSize() @Override @Nullable ColumnCapabilities getColumnCapabilities(String column); + + /** + * Returns a group-by selector. Allows columns to control their own grouping behavior. + * + * @param column column name + * @param deferExpressionDimensions active value of {@link org.apache.druid.query.groupby.GroupByQueryConfig#CTX_KEY_DEFER_EXPRESSION_DIMENSIONS} + */ + default GroupByVectorColumnSelector makeGroupByVectorColumnSelector( + String column, + DeferExpressionDimensions deferExpressionDimensions + ) + { + return ColumnProcessors.makeVectorProcessor( + column, + GroupByVectorColumnProcessorFactory.instance(), + this + ); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionDeferredGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionDeferredGroupByVectorColumnSelector.java new file mode 100644 index 000000000000..82e8a1f8eb80 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionDeferredGroupByVectorColumnSelector.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.virtual; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.math.expr.Expr; +import org.apache.druid.math.expr.ExpressionType; +import org.apache.druid.math.expr.InputBindings; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector; +import org.apache.druid.segment.column.RowSignature; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link GroupByVectorColumnSelector} that uses a wide key representing all expression inputs + * to enable deferring expression evaluation to {@link #writeKeyToResultRow(MemoryPointer, int, ResultRow, int)}. + * + * For example, the expression "coalesce(x, y)" would write a key composed of (x, y) in {@link #writeKeys}, then + * compute "coalesce(x, y)" in {@link #writeKeyToResultRow}. + */ +public class ExpressionDeferredGroupByVectorColumnSelector implements GroupByVectorColumnSelector +{ + private final Expr expr; + private final List subSelectors; + private final int exprKeyBytes; + + /** + * Used internally by {@link #writeKeyToResultRow(MemoryPointer, int, ResultRow, int)} to populate inputs + * for the expression. + */ + private final ResultRow tmpResultRow; + + /** + * Used internally by {@link #writeKeyToResultRow(MemoryPointer, int, ResultRow, int)} to evaluate the expression + * on {@link #tmpResultRow}. + */ + private final Expr.ObjectBinding tmpResultRowBindings; + + ExpressionDeferredGroupByVectorColumnSelector( + final Expr expr, + final RowSignature exprInputSignature, + final List subSelectors + ) + { + this.expr = expr; + this.subSelectors = subSelectors; + this.tmpResultRow = ResultRow.create(subSelectors.size()); + + int exprKeyBytesTmp = 0; + final Map> tmpResultRowSuppliers = new HashMap<>(); + for (int i = 0; i < exprInputSignature.size(); i++) { + final int columnPosition = i; + exprKeyBytesTmp += subSelectors.get(i).getGroupingKeySize(); + tmpResultRowSuppliers.put( + exprInputSignature.getColumnName(i), + InputBindings.inputSupplier( + ExpressionType.fromColumnType(exprInputSignature.getColumnType(columnPosition).orElse(null)), + () -> tmpResultRow.getArray()[columnPosition] + ) + ); + } + this.exprKeyBytes = exprKeyBytesTmp; + this.tmpResultRowBindings = InputBindings.forInputSuppliers(tmpResultRowSuppliers); + } + + @Override + public int getGroupingKeySize() + { + return exprKeyBytes; + } + + @Override + public int writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow) + { + int retVal = 0; + for (final GroupByVectorColumnSelector subSelector : subSelectors) { + retVal += subSelector.writeKeys(keySpace, keySize, keyOffset, startRow, endRow); + keyOffset += subSelector.getGroupingKeySize(); + } + return retVal; + } + + @Override + public void writeKeyToResultRow(MemoryPointer keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition) + { + for (int i = 0; i < subSelectors.size(); i++) { + final GroupByVectorColumnSelector subSelector = subSelectors.get(i); + subSelector.writeKeyToResultRow(keyMemory, keyOffset, tmpResultRow, i); + keyOffset += subSelector.getGroupingKeySize(); + } + + resultRow.getArray()[resultRowPosition] = expr.eval(tmpResultRowBindings).valueOrDefault(); + } + + @Override + public void reset() + { + for (final GroupByVectorColumnSelector subSelector : subSelectors) { + subSelector.reset(); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java index 654a734e3750..5c022cbf3055 100644 --- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java +++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java @@ -28,8 +28,11 @@ import org.apache.druid.math.expr.vector.ExprVectorProcessor; import org.apache.druid.math.expr.vector.VectorProcessors; import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.DeferExpressionDimensions; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.vector.ConstantVectorSelectors; import org.apache.druid.segment.vector.ReadableVectorInspector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; @@ -37,6 +40,8 @@ import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; public class ExpressionVectorSelectors @@ -107,6 +112,51 @@ public static VectorObjectSelector makeVectorObjectSelector( return new ExpressionVectorObjectSelector(processor, bindings); } + /** + * Creates a {@link ExpressionDeferredGroupByVectorColumnSelector} for the provided expression, if the + * provided {@link DeferExpressionDimensions} says we should. + * + * @param factory column selector factory + * @param expression expression + * @param deferExpressionDimensions active value of {@link org.apache.druid.query.groupby.GroupByQueryConfig#CTX_KEY_DEFER_EXPRESSION_DIMENSIONS} + * + * @return selector, or null if the {@link DeferExpressionDimensions} determines we should not defer the expression + */ + @Nullable + public static GroupByVectorColumnSelector makeGroupByVectorColumnSelector( + VectorColumnSelectorFactory factory, + Expr expression, + DeferExpressionDimensions deferExpressionDimensions + ) + { + final ExpressionPlan plan = ExpressionPlanner.plan(factory, expression); + Preconditions.checkArgument(plan.is(ExpressionPlan.Trait.VECTORIZABLE)); + + final List requiredBindings = plan.getAnalysis().getRequiredBindingsList(); + + if (!deferExpressionDimensions.useDeferredGroupBySelector(plan, requiredBindings, factory)) { + return null; + } + + final RowSignature.Builder requiredBindingsSignatureBuilder = RowSignature.builder(); + final List subSelectors = new ArrayList<>(); + + for (final String columnName : requiredBindings) { + final ColumnCapabilities capabilities = factory.getColumnCapabilities(columnName); + final ColumnType columnType = capabilities != null ? capabilities.toColumnType() : ColumnType.STRING; + final GroupByVectorColumnSelector subSelector = + factory.makeGroupByVectorColumnSelector(columnName, deferExpressionDimensions); + requiredBindingsSignatureBuilder.add(columnName, columnType); + subSelectors.add(subSelector); + } + + return new ExpressionDeferredGroupByVectorColumnSelector( + expression.asSingleThreaded(factory), + requiredBindingsSignatureBuilder.build(), + subSelectors + ); + } + public static VectorObjectSelector castValueSelectorToObject( ReadableVectorInspector inspector, String columnName, diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java index 589520612398..42a723907b9d 100644 --- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java @@ -35,6 +35,8 @@ import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.ColumnIndexSelector; +import org.apache.druid.query.groupby.DeferExpressionDimensions; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -242,6 +244,28 @@ public VectorObjectSelector makeVectorObjectSelector(String columnName, VectorCo return ExpressionVectorSelectors.makeVectorObjectSelector(factory, parsedExpression.get()); } + @Nullable + @Override + public GroupByVectorColumnSelector makeGroupByVectorColumnSelector( + String columnName, + VectorColumnSelectorFactory factory, + DeferExpressionDimensions deferExpressionDimensions + ) + { + if (isDirectAccess(factory)) { + return factory.makeGroupByVectorColumnSelector( + parsedExpression.get().getBindingIfIdentifier(), + deferExpressionDimensions + ); + } + + return ExpressionVectorSelectors.makeGroupByVectorColumnSelector( + factory, + parsedExpression.get(), + deferExpressionDimensions + ); + } + @Nullable @Override public ColumnIndexSupplier getIndexSupplier( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java index 34ed99cda211..49b0b035f37b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java @@ -75,6 +75,7 @@ public void testNoOverrides() Assert.assertEquals(5, config2.getConfiguredMaxSelectorDictionarySize()); Assert.assertEquals(6_000_000, config2.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); + Assert.assertEquals(DeferExpressionDimensions.FIXED_WIDTH_NON_NUMERIC, config2.getDeferExpressionDimensions()); Assert.assertFalse(config2.isApplyLimitPushDownToSegment()); } @@ -94,6 +95,10 @@ public void testOverrides() .put("maxSelectorDictionarySize", 3) .put("maxMergingDictionarySize", 4) .put("applyLimitPushDownToSegment", true) + .put( + GroupByQueryConfig.CTX_KEY_DEFER_EXPRESSION_DIMENSIONS, + DeferExpressionDimensions.ALWAYS.toString() + ) .build() ) .build() @@ -105,6 +110,7 @@ public void testOverrides() Assert.assertEquals(3, config2.getConfiguredMaxSelectorDictionarySize()); Assert.assertEquals(4, config2.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); + Assert.assertEquals(DeferExpressionDimensions.ALWAYS, config2.getDeferExpressionDimensions()); Assert.assertTrue(config2.isApplyLimitPushDownToSegment()); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 3613246fef65..d4dc87341306 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -9846,6 +9846,40 @@ public void testGroupByCardinalityAggOnMultiStringExpression() TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg"); } + @Test + public void testGroupByDimensionOnMultiStringExpression() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setVirtualColumns( + new ExpressionVirtualColumn("v0", "concat(quality,market)", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ) + .setDimensions(new DefaultDimensionSpec("v0", "d0")) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List expectedResults = ImmutableList.of( + makeRow(query, "2011-04-01", "d0", "automotivespot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "businessspot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "entertainmentspot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "healthspot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "mezzaninespot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "mezzaninetotal_market", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "mezzanineupfront", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "newsspot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "premiumspot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "premiumtotal_market", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "premiumupfront", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "technologyspot", "rows", 2L), + makeRow(query, "2011-04-01", "d0", "travelspot", "rows", 2L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg"); + } + @Test public void testGroupByCardinalityAggOnHyperUnique() { diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java index cd8b78ed1f97..6d27439f8a61 100644 --- a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java +++ b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.virtual; import com.google.common.collect.ImmutableList; +import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; @@ -29,6 +30,10 @@ import org.apache.druid.math.expr.Parser; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.groupby.DeferExpressionDimensions; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DeprecatedQueryableIndexColumnSelector; @@ -260,14 +265,17 @@ public static void sanityTestVectorizedExpressionSelectors( } else { objectSelector = cursor.getColumnSelectorFactory().makeObjectSelector("v"); } + GroupByVectorColumnSelector groupBySelector = + cursor.getColumnSelectorFactory().makeGroupByVectorColumnSelector("v", DeferExpressionDimensions.ALWAYS); while (!cursor.isDone()) { + final List resultsVector = new ArrayList<>(); boolean[] nulls; switch (outputType.getType()) { case LONG: nulls = selector.getNullVector(); long[] longs = selector.getLongVector(); for (int i = 0; i < selector.getCurrentVectorSize(); i++, rowCount++) { - results.add(nulls != null && nulls[i] ? null : longs[i]); + resultsVector.add(nulls != null && nulls[i] ? null : longs[i]); } break; case DOUBLE: @@ -276,24 +284,26 @@ public static void sanityTestVectorizedExpressionSelectors( nulls = selector.getNullVector(); float[] floats = selector.getFloatVector(); for (int i = 0; i < selector.getCurrentVectorSize(); i++, rowCount++) { - results.add(nulls != null && nulls[i] ? null : (double) floats[i]); + resultsVector.add(nulls != null && nulls[i] ? null : (double) floats[i]); } } else { nulls = selector.getNullVector(); double[] doubles = selector.getDoubleVector(); for (int i = 0; i < selector.getCurrentVectorSize(); i++, rowCount++) { - results.add(nulls != null && nulls[i] ? null : doubles[i]); + resultsVector.add(nulls != null && nulls[i] ? null : doubles[i]); } } break; case STRING: Object[] objects = objectSelector.getObjectVector(); for (int i = 0; i < objectSelector.getCurrentVectorSize(); i++, rowCount++) { - results.add(objects[i]); + resultsVector.add(objects[i]); } break; } + verifyGroupBySelector(groupBySelector, resultsVector); + results.addAll(resultsVector); cursor.advance(); } } @@ -328,4 +338,24 @@ public static void sanityTestVectorizedExpressionSelectors( Assert.assertTrue(rowCountCursor > 0); Assert.assertEquals(rowCountCursor, rowCount); } + + private static void verifyGroupBySelector( + final GroupByVectorColumnSelector groupBySelector, + final List expectedResults + ) + { + final int keyOffset = 1; + final int keySize = groupBySelector.getGroupingKeySize() + keyOffset + 1; // 1 byte before, 1 byte after + final WritableMemory keySpace = + WritableMemory.allocate(keySize * expectedResults.size()); + + final int writeKeysRetVal = groupBySelector.writeKeys(keySpace, keySize, keyOffset, 0, expectedResults.size()); + Assert.assertEquals(0, writeKeysRetVal); + + for (int i = 0; i < expectedResults.size(); i++) { + final ResultRow resultRow = ResultRow.create(1); + groupBySelector.writeKeyToResultRow(new MemoryPointer(keySpace, (long) keySize * i), keyOffset, resultRow, 0); + Assert.assertEquals("row #" + i, expectedResults.get(i), resultRow.getArray()[0]); + } + } }