Skip to content

Commit

Permalink
Defer more expressions in vectorized groupBy. (apache#16338)
Browse files Browse the repository at this point in the history
* Defer more expressions in vectorized groupBy.

This patch adds a way for columns to provide GroupByVectorColumnSelectors,
which controls how the groupBy engine operates on them. This mechanism is used
by ExpressionVirtualColumn to provide an ExpressionDeferredGroupByVectorColumnSelector
that uses the inputs of an expression as the grouping key. The actual expression
evaluation is deferred until the grouped ResultRow is created.

A new context parameter "deferExpressionDimensions" allows users to control when
this deferred selector is used. The default is "fixedWidthNonNumeric", which is a
behavioral change from the prior behavior. Users can get the prior behavior by setting
this to "singleString".

* Fix style.

* Add deferExpressionDimensions to SqlExpressionBenchmark.

* Fix style.

* Fix inspections.

* Add more testing.

* Use valueOrDefault.

* Compute exprKeyBytes a bit lighter-weight.
  • Loading branch information
gianm authored Jun 27, 2024
1 parent d4f2636 commit dbed1b0
Show file tree
Hide file tree
Showing 14 changed files with 593 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -236,6 +237,14 @@ public String getFormatString()
})
private String schema;

@Param({
"singleString",
"fixedWidth",
"fixedWidthNonNumeric",
"always"
})
private String deferExpressionDimensions;

@Param({
// non-expression reference
"0",
Expand Down Expand Up @@ -414,7 +423,8 @@ public void querySql(Blackhole blackhole)
{
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize,
GroupByQueryConfig.CTX_KEY_DEFER_EXPRESSION_DIMENSIONS, deferExpressionDimensions
);
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, context)) {
Expand Down
2 changes: 1 addition & 1 deletion docs/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ Supported query contexts:
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|
|`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan. This context value can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true|
|`groupByEnableMultiValueUnnesting`|Safety flag to enable/disable the implicit unnesting on multi value column's as part of the grouping key. 'true' indicates multi-value grouping keys are unnested. 'false' returns an error if a multi value column is found as part of the grouping key.|true|

|`deferExpressionDimensions`|When an entry in `dimensions` references an `expression` virtual column, this property influences whether expression evaluation is deferred from cursor processing to the merge step. Options are:<ul><li>`fixedWidth`: Defer expressions with fixed-width inputs (numeric and dictionary-encoded string).</li><li>`fixedWidthNonNumeric`: Defer expressions with fixed-width inputs (numeric and dictionary-encoded string), unless the expression output and all inputs are numeric.</li><li>`singleString`: Defer string-typed expressions with a single dictionary-encoded string input.</li><li>`always`: Defer all expressions. May require building dictionaries for expression inputs.</li></ul><br />These properties only take effect when the `groupBy` query can be vectorized. Non-vectorized queries only defer string-typed expressions of single string inputs.|`fixedWidthNonNumeric`|

#### Array based result rows

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.virtual.ExpressionPlan;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;

import java.util.List;

/**
* Controls deferral of {@link ExpressionVirtualColumn} in {@link GroupByQuery}.
*/
public enum DeferExpressionDimensions
{
SINGLE_STRING("singleString") {
@Override
public boolean useDeferredGroupBySelector(
ExpressionPlan plan,
List<String> requiredBindingsList,
ColumnInspector inspector
)
{
return false;
}
},

/**
* Defer expressions when their input variables are all fixed-width types (primitive numbers, or dictionary encoded).
*/
FIXED_WIDTH("fixedWidth") {
@Override
public boolean useDeferredGroupBySelector(
ExpressionPlan plan,
List<String> requiredBindingsList,
ColumnInspector inspector
)
{
if (isInnatelyDeferrable(plan, requiredBindingsList, inspector)) {
return false;
}

for (final String requiredBinding : requiredBindingsList) {
final ColumnCapabilities capabilities = inspector.getColumnCapabilities(requiredBinding);
if (capabilities == null) {
return false;
}

if (!capabilities.isNumeric() && !capabilities.isDictionaryEncoded().isTrue()) {
// Not fixed-width.
return false;
}
}

return true;
}
},

/**
* Defer expressions when their input variables are all fixed-width types (primitive numbers, or dictionary encoded).
*/
FIXED_WIDTH_NON_NUMERIC("fixedWidthNonNumeric") {
@Override
public boolean useDeferredGroupBySelector(
ExpressionPlan plan,
List<String> requiredBindingsList,
ColumnInspector inspector
)
{
if (isInnatelyDeferrable(plan, requiredBindingsList, inspector)) {
return false;
}

boolean allNumericInputs = true;

for (final String requiredBinding : requiredBindingsList) {
final ColumnCapabilities capabilities = inspector.getColumnCapabilities(requiredBinding);
if (capabilities == null) {
return false;
}

allNumericInputs = allNumericInputs && capabilities.isNumeric();

if (!capabilities.isNumeric() && !capabilities.isDictionaryEncoded().isTrue()) {
// Not fixed-width.
return false;
}
}

return !allNumericInputs || (plan.getOutputType() != null && !plan.getOutputType().isNumeric());
}
},

ALWAYS("always") {
@Override
public boolean useDeferredGroupBySelector(
ExpressionPlan plan,
List<String> requiredBindingsList,
ColumnInspector inspector
)
{
return !isInnatelyDeferrable(plan, requiredBindingsList, inspector);
}
};

public static final String JSON_KEY = "deferExpressionDimensions";

private final String jsonName;

DeferExpressionDimensions(String jsonName)
{
this.jsonName = jsonName;
}

@JsonCreator
public static DeferExpressionDimensions fromString(final String jsonName)
{
for (final DeferExpressionDimensions value : values()) {
if (value.jsonName.equals(jsonName)) {
return value;
}
}

throw new IAE("Invalid value[%s] for[%s]", jsonName, JSON_KEY);
}

public abstract boolean useDeferredGroupBySelector(
ExpressionPlan plan,
List<String> requiredBindingsList,
ColumnInspector inspector
);

@Override
@JsonValue
public String toString()
{
return jsonName;
}

/**
* Whether the given expression can be deferred innately by the selector created by
* {@link ExpressionVirtualColumn#makeSingleValueVectorDimensionSelector(DimensionSpec, VectorColumnSelectorFactory)}.
*
* In this case, all options for this enum return false from
* {@link #useDeferredGroupBySelector(ExpressionPlan, List, ColumnInspector)}, because there is no need to defer
* redundantly.
*/
private static boolean isInnatelyDeferrable(
ExpressionPlan plan,
List<String> requiredBindingsList,
ColumnInspector inspector
)
{
if (plan.getOutputType() != null
&& plan.getOutputType().is(ExprType.STRING)
&& requiredBindingsList.size() <= 1) {
for (final String requiredBinding : requiredBindingsList) {
final ColumnCapabilities requiredBindingCapabilities = inspector.getColumnCapabilities(requiredBinding);

if (requiredBindingCapabilities == null
|| !requiredBindingCapabilities.is(ValueType.STRING)
|| !requiredBindingCapabilities.isDictionaryEncoded().isTrue()) {
return false;
}
}

return true;
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.druid.query.QueryContexts;
import org.apache.druid.utils.JvmUtils;

import java.util.Optional;

/**
*
*/
Expand All @@ -44,6 +46,7 @@ public class GroupByQueryConfig
public static final String CTX_KEY_ARRAY_RESULT_ROWS = "resultAsArray";
public static final String CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING = "groupByEnableMultiValueUnnesting";
public static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
public static final String CTX_KEY_DEFER_EXPRESSION_DIMENSIONS = "deferExpressionDimensions";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor";
Expand Down Expand Up @@ -119,6 +122,9 @@ public class GroupByQueryConfig
@JsonProperty
private boolean mergeThreadLocal = false;

@JsonProperty
private DeferExpressionDimensions deferExpressionDimensions = DeferExpressionDimensions.FIXED_WIDTH_NON_NUMERIC;

@JsonProperty
private boolean vectorize = true;

Expand Down Expand Up @@ -277,6 +283,11 @@ public boolean isMergeThreadLocal()
return mergeThreadLocal;
}

public DeferExpressionDimensions getDeferExpressionDimensions()
{
return deferExpressionDimensions;
}

public boolean isVectorize()
{
return vectorize;
Expand Down Expand Up @@ -350,6 +361,10 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
getNumParallelCombineThreads()
);
newConfig.mergeThreadLocal = queryContext.getBoolean(CTX_KEY_MERGE_THREAD_LOCAL, isMergeThreadLocal());
newConfig.deferExpressionDimensions =
Optional.ofNullable(queryContext.getString(CTX_KEY_DEFER_EXPRESSION_DIMENSIONS))
.map(DeferExpressionDimensions::fromString)
.orElse(getDeferExpressionDimensions());
newConfig.vectorize = queryContext.getBoolean(QueryContexts.VECTORIZE_KEY, isVectorize());
newConfig.enableMultiValueUnnesting = queryContext.getBoolean(
CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING,
Expand Down Expand Up @@ -378,6 +393,8 @@ public String toString()
", vectorize=" + vectorize +
", forcePushDownNestedQuery=" + forcePushDownNestedQuery +
", enableMultiValueUnnesting=" + enableMultiValueUnnesting +
", mergeThreadLocal=" + mergeThreadLocal +
", deferExpressionDimensions=" + deferExpressionDimensions +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
Expand Down Expand Up @@ -137,12 +138,22 @@ public void close()
try {
final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
dimensionSpec ->
ColumnProcessors.makeVectorProcessor(
dimensionSpec -> {
if (dimensionSpec instanceof DefaultDimensionSpec) {
// Delegate creation of GroupByVectorColumnSelector to the column selector factory, so that
// virtual columns (like ExpressionVirtualColumn) can control their own grouping behavior.
return columnSelectorFactory.makeGroupByVectorColumnSelector(
dimensionSpec.getDimension(),
config.getDeferExpressionDimensions()
);
} else {
return ColumnProcessors.makeVectorProcessor(
dimensionSpec,
GroupByVectorColumnProcessorFactory.instance(),
columnSelectorFactory
)
);
}
}
).collect(Collectors.toList());

return new VectorGroupByEngineIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.druid.java.util.common.Cacheable;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.ColumnIndexSelector;
import org.apache.druid.query.groupby.DeferExpressionDimensions;
import org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.data.ReadableOffset;
Expand Down Expand Up @@ -240,6 +242,26 @@ default VectorObjectSelector makeVectorObjectSelector(
return null;
}

/**
* Returns a group-by selector. Allows virtual columns to control their own grouping behavior.
*
* @param columnName column name
* @param factory column selector factory
* @param deferExpressionDimensions active value of {@link org.apache.druid.query.groupby.GroupByQueryConfig#CTX_KEY_DEFER_EXPRESSION_DIMENSIONS}
*
* @return selector, or null if this virtual column does not have a specialized one
*/
@SuppressWarnings("unused")
@Nullable
default GroupByVectorColumnSelector makeGroupByVectorColumnSelector(
String columnName,
VectorColumnSelectorFactory factory,
DeferExpressionDimensions deferExpressionDimensions
)
{
return null;
}

/**
* This method is deprecated in favor of {@link #capabilities(ColumnInspector, String)}, which should be used whenever
* possible and can support virtual column implementations that need to inspect other columns as inputs.
Expand All @@ -265,8 +287,9 @@ default VectorObjectSelector makeVectorObjectSelector(
* Examples of this include the {@link ExpressionVirtualColumn}, which takes input from other columns and uses the
* {@link ColumnInspector} to infer the output type of expressions based on the types of the inputs.
*
* @param inspector column inspector to provide additional information of other available columns
* @param inspector column inspector to provide additional information of other available columns
* @param columnName the name this virtual column was referenced with
*
* @return capabilities, must not be null
*/
@Nullable
Expand Down
Loading

0 comments on commit dbed1b0

Please sign in to comment.