Skip to content

Commit

Permalink
Speed up SQL IN using SCALAR_IN_ARRAY. (#16388)
Browse files Browse the repository at this point in the history
* Speed up SQL IN using SCALAR_IN_ARRAY.

Main changes:

1) DruidSqlValidator now includes a rewrite of IN to SCALAR_IN_ARRAY, when the size of
   the IN is above inFunctionThreshold. The default value of inFunctionThreshold
   is 100. Users can restore the prior behavior by setting it to Integer.MAX_VALUE.

2) SearchOperatorConversion now generates SCALAR_IN_ARRAY when converting to a regular
   expression, when the size of the SEARCH is above inFunctionExprThreshold. The default
   value of inFunctionExprThreshold is 2. Users can restore the prior behavior by setting
   it to Integer.MAX_VALUE.

3) ReverseLookupRule generates SCALAR_IN_ARRAY if the set of reverse-looked-up values is
   greater than inFunctionThreshold.

* Revert test.

* Additional coverage.

* Update docs/querying/sql-query-context.md

Co-authored-by: Benedict Jin <[email protected]>

* New test.

---------

Co-authored-by: Benedict Jin <[email protected]>
  • Loading branch information
gianm and asdf2014 authored May 14, 2024
1 parent c1bf4fe commit 72432c2
Show file tree
Hide file tree
Showing 11 changed files with 857 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,27 @@ public void planNotEquals(Blackhole blackhole)
blackhole.consume(plannerResult);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void planEqualsInsideAndOutsideCase(Blackhole blackhole)
{
final String sql = StringUtils.format(
"SELECT COUNT(*) FROM foo\n"
+ "WHERE\n"
+ " CASE WHEN LOOKUP(dimZipf, 'benchmark-lookup', 'N/A') = '%s'\n"
+ " THEN NULL\n"
+ " ELSE LOOKUP(dimZipf, 'benchmark-lookup', 'N/A')\n"
+ " END IN ('%s', '%s', '%s')",
LookupBenchmarkUtil.makeKeyOrValue(0),
LookupBenchmarkUtil.makeKeyOrValue(1),
LookupBenchmarkUtil.makeKeyOrValue(2),
LookupBenchmarkUtil.makeKeyOrValue(3)
);
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, ImmutableMap.of())) {
final PlannerResult plannerResult = planner.plan();
blackhole.consume(plannerResult);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
Expand All @@ -36,6 +37,7 @@
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
Expand Down Expand Up @@ -204,7 +206,7 @@ public void setup() throws JsonProcessingException
);

String prefix = ("explain plan for select long1 from foo where long1 in ");
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);

final Sequence<Object[]> resultSequence = getPlan(sql, null);
final Object[] planResult = resultSequence.toList().get(0);
Expand All @@ -222,12 +224,13 @@ public void tearDown() throws Exception
closer.close();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryInSql(Blackhole blackhole)
{
String prefix = "explain plan for select long1 from foo where long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

Expand All @@ -238,7 +241,7 @@ public void queryEqualOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select long1 from foo where string1 = '7' or long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

Expand All @@ -250,28 +253,74 @@ public void queryMultiEqualOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select long1 from foo where string1 = '7' or string1 = '8' or long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryJoinEqualOrInSql(Blackhole blackhole)
public void queryStringFunctionInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select count(*) from foo where long1 = 8 or lower(string1) in ";
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.STRING);
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryStringFunctionIsNotNullAndNotInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select count(*) from foo where long1 = 8 and lower(string1) is not null and lower(string1) not in ";
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.STRING);
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryStringFunctionIsNullOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select count(*) from foo where long1 = 8 and (lower(string1) is null or lower(string1) in ";
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.STRING) + ')';
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryJoinEqualOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select foo.long1, fooright.string1 from foo inner join foo as fooright on foo.string1 = fooright.string1 where fooright.string1 = '7' or foo.long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

private String createQuery(String prefix, int inClauseLiteralsCount)
private String createQuery(String prefix, int inClauseLiteralsCount, ValueType type)
{
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append(prefix).append('(');
IntStream.range(1, inClauseLiteralsCount - 1).forEach(i -> sqlBuilder.append(i).append(","));
sqlBuilder.append(inClauseLiteralsCount).append(")");
IntStream.range(1, inClauseLiteralsCount + 1).forEach(
i -> {
if (i > 1) {
sqlBuilder.append(',');
}

if (type == ValueType.LONG) {
sqlBuilder.append(i);
} else if (type == ValueType.STRING) {
sqlBuilder.append("'").append(i).append("'");
} else {
throw new ISE("Cannot generate IN with type[%s]", type);
}
}
);
sqlBuilder.append(")");
return sqlBuilder.toString();
}

Expand Down
4 changes: 3 additions & 1 deletion docs/querying/sql-query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ Configure Druid SQL query planning using the parameters in the table below.
|`sqlPullUpLookup`|Whether to consider the [pull-up rewrite](lookups.md#pull-up) of the `LOOKUP` function during SQL planning.|true|
|`enableJoinLeftTableScanDirect`|`false`|This flag applies to queries which have joins. For joins, where left child is a simple scan with a filter, by default, druid will run the scan as a query and the join the results to the right child on broker. Setting this flag to true overrides that behavior and druid will attempt to push the join to data servers instead. Please note that the flag could be applicable to queries even if there is no explicit join. since queries can internally translated into a join by the SQL planner.|
|`maxNumericInFilters`|`-1`|Max limit for the amount of numeric values that can be compared for a string type dimension when the entire SQL WHERE clause of a query translates only to an [OR](../querying/filters.md#or) of [Bound filter](../querying/filters.md#bound-filter). By default, Druid does not restrict the amount of of numeric Bound Filters on String columns, although this situation may block other queries from running. Set this parameter to a smaller value to prevent Druid from running queries that have prohibitively long segment processing times. The optimal limit requires some trial and error; we recommend starting with 100. Users who submit a query that exceeds the limit of `maxNumericInFilters` should instead rewrite their queries to use strings in the `WHERE` clause instead of numbers. For example, `WHERE someString IN (‘123’, ‘456’)`. This value cannot exceed the set system configuration `druid.sql.planner.maxNumericInFilters`. This value is ignored if `druid.sql.planner.maxNumericInFilters` is not set explicitly.|
|`inSubQueryThreshold`|`2147483647`| Threshold for minimum number of values in an IN clause to convert the query to a JOIN operation on an inlined table rather than a predicate. A threshold of 0 forces usage of an inline table in all cases; a threshold of [Integer.MAX_VALUE] forces usage of OR in all cases. |
|`inFunctionThreshold`|`100`| At or beyond this threshold number of values, SQL `IN` is converted to [`SCALAR_IN_ARRAY`](sql-functions.md#scalar_in_array). A threshold of 0 forces this conversion in all cases. A threshold of [Integer.MAX_VALUE] disables this conversion. The converted function is eligible for fewer planning-time optimizations, which speeds up planning, but may prevent certain planning-time optimizations.|
|`inFunctionExprThreshold`|`2`| At or beyond this threshold number of values, SQL `IN` is eligible for execution using the native function `scalar_in_array` rather than an <code>&#124;&#124;</code> of `==`, even if the number of values is below `inFunctionThreshold`. This property only affects translation of SQL `IN` to a [native expression](math-expr.md). It does not affect translation of SQL `IN` to a [native filter](filters.md). This property is provided for backwards compatibility purposes, and may be removed in a future release.|
|`inSubQueryThreshold`|`2147483647`| At or beyond this threshold number of values, SQL `IN` is converted to `JOIN` on an inline table. `inFunctionThreshold` takes priority over this setting. A threshold of 0 forces usage of an inline table in all cases where the size of a SQL `IN` is larger than `inFunctionThreshold`. A threshold of `2147483647` disables the rewrite of SQL `IN` to `JOIN`. |

## Setting the query context
The query context parameters can be specified as a "context" object in the [JSON API](../api-reference/sql-api.md) or as a [JDBC connection properties object](../api-reference/sql-jdbc.md).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts.Vectorize;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.TypedInFilter;
import org.apache.druid.segment.QueryableIndexStorageAdapter;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -575,6 +576,35 @@ public int getInSubQueryThreshold(int defaultValue)
);
}

/**
* At or above this threshold number of values, when planning SQL queries, use the SQL SCALAR_IN_ARRAY operator rather
* than a stack of SQL ORs. This speeds up planning for large sets of points because it is opaque to various
* expensive optimizations. But, because this does bypass certain optimizations, we only do the transformation above
* a certain threshold. The SCALAR_IN_ARRAY operator is still able to convert to {@link InDimFilter} or
* {@link TypedInFilter}.
*/
public int getInFunctionThreshold()
{
return getInt(
QueryContexts.IN_FUNCTION_THRESHOLD,
QueryContexts.DEFAULT_IN_FUNCTION_THRESHOLD
);
}

/**
* At or above this threshold, when converting the SEARCH operator to a native expression, use the "scalar_in_array"
* function rather than a sequence of equals (==) separated by or (||). This is typically a lower threshold
* than {@link #getInFunctionThreshold()}, because it does not prevent any SQL planning optimizations, and it
* speeds up query execution.
*/
public int getInFunctionExprThreshold()
{
return getInt(
QueryContexts.IN_FUNCTION_EXPR_THRESHOLD,
QueryContexts.DEFAULT_IN_FUNCTION_EXPR_THRESHOLD
);
}

public boolean isTimeBoundaryPlanningEnabled()
{
return getBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class QueryContexts
public static final String BY_SEGMENT_KEY = "bySegment";
public static final String BROKER_SERVICE_NAME = "brokerService";
public static final String IN_SUB_QUERY_THRESHOLD_KEY = "inSubQueryThreshold";
public static final String IN_FUNCTION_THRESHOLD = "inFunctionThreshold";
public static final String IN_FUNCTION_EXPR_THRESHOLD = "inFunctionExprThreshold";
public static final String TIME_BOUNDARY_PLANNING_KEY = "enableTimeBoundaryPlanning";
public static final String POPULATE_CACHE_KEY = "populateCache";
public static final String POPULATE_RESULT_LEVEL_CACHE_KEY = "populateResultLevelCache";
Expand Down Expand Up @@ -120,6 +122,8 @@ public class QueryContexts
public static final boolean DEFAULT_SECONDARY_PARTITION_PRUNING = true;
public static final boolean DEFAULT_ENABLE_DEBUG = false;
public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = Integer.MAX_VALUE;
public static final int DEFAULT_IN_FUNCTION_THRESHOLD = 100;
public static final int DEFAULT_IN_FUNCTION_EXPR_THRESHOLD = 2;
public static final boolean DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING = false;
public static final boolean DEFAULT_WINDOWING_STRICT_VALIDATION = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.junit.Test;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -337,11 +336,35 @@ public void testGetMaxSubqueryBytes()
ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "auto")
);
assertEquals("auto", context2.getMaxSubqueryMemoryBytes(null));

final QueryContext context3 = new QueryContext(ImmutableMap.of());
assertEquals("disabled", context3.getMaxSubqueryMemoryBytes("disabled"));
}

@Test
public void testGetInFunctionThreshold()
{
final QueryContext context1 = new QueryContext(
ImmutableMap.of(QueryContexts.IN_FUNCTION_THRESHOLD, Integer.MAX_VALUE)
);
assertEquals(Integer.MAX_VALUE, context1.getInFunctionThreshold());

final QueryContext context2 = QueryContext.empty();
assertEquals(QueryContexts.DEFAULT_IN_FUNCTION_THRESHOLD, context2.getInFunctionThreshold());
}

@Test
public void testGetInFunctionExprThreshold()
{
final QueryContext context1 = new QueryContext(
ImmutableMap.of(QueryContexts.IN_FUNCTION_EXPR_THRESHOLD, Integer.MAX_VALUE)
);
assertEquals(Integer.MAX_VALUE, context1.getInFunctionExprThreshold());

final QueryContext context2 = QueryContext.empty();
assertEquals(QueryContexts.DEFAULT_IN_FUNCTION_EXPR_THRESHOLD, context2.getInFunctionExprThreshold());
}

@Test
public void testDefaultEnableQueryDebugging()
{
Expand Down
Loading

0 comments on commit 72432c2

Please sign in to comment.