Skip to content

Commit

Permalink
Check for Unsupported Aggregation with Distinct when useApproxCountDi…
Browse files Browse the repository at this point in the history
…stinct is enabled (apache#16770)

* init

* add NativelySupportsDistinct

* refactor

* javadoc

* refactor

* fix tests

* fix drill tests

* comments

* Update sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java

---------

Co-authored-by: Benedict Jin <[email protected]>
  • Loading branch information
sreemanamala and asdf2014 committed Aug 6, 2024
1 parent aa55f82 commit 13f249b
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public Aggregation toDruidAggregation(
);
}

@NativelySupportsDistinct
private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction
{
ApproxCountDistinctSqlAggFunction(SqlAggFunction delegate)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.aggregation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* This annotation is to distinguish {@link org.apache.calcite.sql.SqlAggFunction}
* which supports the distinct aggregation natively
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface NativelySupportsDistinct
{

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.NativelySupportsDistinct;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
Expand Down Expand Up @@ -142,6 +143,7 @@ public Aggregation toDruidAggregation(
}
}

@NativelySupportsDistinct
private static class ArrayConcatAggFunction extends SqlAggFunction
{
ArrayConcatAggFunction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.NativelySupportsDistinct;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
Expand Down Expand Up @@ -165,6 +166,7 @@ public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
}
}

@NativelySupportsDistinct
private static class ArrayAggFunction extends SqlAggFunction
{
private static final ArrayAggReturnTypeInference RETURN_TYPE_INFERENCE = new ArrayAggReturnTypeInference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.NativelySupportsDistinct;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
Expand Down Expand Up @@ -226,6 +227,7 @@ public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
}
}

@NativelySupportsDistinct
private static class StringAggFunction extends SqlAggFunction
{
private static final StringAggReturnTypeInference RETURN_TYPE_INFERENCE = new StringAggReturnTypeInference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.runtime.CalciteException;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.NativelySupportsDistinct;
import org.apache.druid.sql.calcite.expression.builtin.ScalarInArrayOperatorConversion;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
Expand Down Expand Up @@ -760,8 +762,10 @@ public void validateCall(SqlCall call, SqlValidatorScope scope)
throw buildCalciteContextException(
StringUtils.format(
"The query contains window functions; To run these window functions, specify [%s] in query context.",
PlannerContext.CTX_ENABLE_WINDOW_FNS),
call);
PlannerContext.CTX_ENABLE_WINDOW_FNS
),
call
);
}
}
if (call.getKind() == SqlKind.NULLS_FIRST) {
Expand All @@ -776,6 +780,19 @@ public void validateCall(SqlCall call, SqlValidatorScope scope)
throw buildCalciteContextException("ASCENDING ordering with NULLS LAST is not supported!", call);
}
}
if (plannerContext.getPlannerConfig().isUseApproximateCountDistinct() && isSqlCallDistinct(call)) {
if (call.getOperator().getKind() != SqlKind.COUNT && call.getOperator() instanceof SqlAggFunction) {
if (!call.getOperator().getClass().isAnnotationPresent(NativelySupportsDistinct.class)) {
throw buildCalciteContextException(
StringUtils.format(
"Aggregation [%s] with DISTINCT is not supported when useApproximateCountDistinct is enabled. Run with disabling it.",
call.getOperator().getName()
),
call
);
}
}
}
super.validateCall(call, scope);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.NativelySupportsDistinct;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.filtration.Filtration;
Expand Down Expand Up @@ -69,6 +71,16 @@ public static Aggregation translateAggregateCall(
return null;
}

if (call.isDistinct() && call.getAggregation().getKind() != SqlKind.COUNT) {
if (!call.getAggregation().getClass().isAnnotationPresent(NativelySupportsDistinct.class)) {
plannerContext.setPlanningError(
"Aggregation [%s] with DISTINCT is not supported when useApproximateCountDistinct is enabled. Run with disabling it.",
call.getAggregation().getName()
);
return null;
}
}

final DimFilter filter;

if (call.filterArg >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15501,6 +15501,20 @@ public void testWindowingErrorWithoutFeatureFlag()
assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, specify [enableWindowing] in query context. (line [1], column [13])"));
}

@Test
public void testDistinctSumNotSupportedWithApproximation()
{
DruidException e = assertThrows(
DruidException.class,
() -> testBuilder()
.queryContext(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, true))
.sql("SELECT sum(distinct m1) from druid.foo")
.run()
);

assertThat(e, invalidSqlContains("Aggregation [SUM] with DISTINCT is not supported"));
}

@Test
public void testUnSupportedNullsFirst()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4426,7 +4426,7 @@ public void test_last_val_lastValFn_39()
windowQueryTest();
}

@NotYetSupported(Modes.NOT_ENOUGH_RULES)
@NotYetSupported(Modes.DISTINCT_AGGREGATE_NOT_SUPPORTED)
@DrillTest("nestedAggs/emtyOvrCls_7")
@Test
public void test_nestedAggs_emtyOvrCls_7()
Expand Down Expand Up @@ -7274,7 +7274,7 @@ public void test_nestedAggs_emtyOvrCls_13()
windowQueryTest();
}

@NotYetSupported(Modes.RESULT_MISMATCH)
@NotYetSupported(Modes.DISTINCT_AGGREGATE_NOT_SUPPORTED)
@DrillTest("nestedAggs/emtyOvrCls_8")
@Test
public void test_nestedAggs_emtyOvrCls_8()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
enum Modes
{
// @formatter:off
NOT_ENOUGH_RULES(DruidException.class, "not enough rules"),
DISTINCT_AGGREGATE_NOT_SUPPORTED(DruidException.class, "DISTINCT is not supported"),
ERROR_HANDLING(AssertionError.class, "targetPersona: is <[A-Z]+> and category: is <[A-Z_]+> and errorCode: is"),
EXPRESSION_NOT_GROUPED(DruidException.class, "Expression '[a-z]+' is not being grouped"),
NULLS_FIRST_LAST(DruidException.class, "NULLS (FIRST|LAST)"),
Expand Down

0 comments on commit 13f249b

Please sign in to comment.