From 9a0667d0d6bdbc10444b41f4fed367496f0253b7 Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Thu, 28 Dec 2023 16:01:24 -0800 Subject: [PATCH 1/5] add sql compatibility for first/last on numeric values --- .../EarliestLatestAnySqlAggregator.java | 38 +++++++++++++++++++ .../druid-models/metric-spec/metric-spec.tsx | 12 ++++-- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index 21bcc833e04e..f6aa79cafa02 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -44,6 +44,9 @@ import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory; @@ -68,6 +71,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; import java.util.ArrayList; @@ -108,6 +112,18 @@ AggregatorFactory createAggregatorFactory( return new DoubleFirstAggregatorFactory(name, fieldName, timeColumn); case STRING: case COMPLEX: + if (type.getComplexTypeName() != null) { + switch (type.getComplexTypeName()) { + case SerializablePairLongLongComplexMetricSerde.TYPE_NAME: + return new LongFirstAggregatorFactory(name, fieldName, timeColumn); + case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME: + return new FloatFirstAggregatorFactory(name, fieldName, timeColumn); + case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME: + return new DoubleFirstAggregatorFactory(name, fieldName, timeColumn); + default: + return new StringFirstAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); + } + } return new StringFirstAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); default: throw SimpleSqlAggregator.badTypeException(fieldName, "EARLIEST", type); @@ -135,6 +151,18 @@ AggregatorFactory createAggregatorFactory( return new DoubleLastAggregatorFactory(name, fieldName, timeColumn); case STRING: case COMPLEX: + if (type.getComplexTypeName() != null) { + switch (type.getComplexTypeName()) { + case SerializablePairLongLongComplexMetricSerde.TYPE_NAME: + return new LongLastAggregatorFactory(name, fieldName, timeColumn); + case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME: + return new FloatLastAggregatorFactory(name, fieldName, timeColumn); + case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME: + return new DoubleLastAggregatorFactory(name, fieldName, timeColumn); + default: + return new StringLastAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); + } + } return new StringLastAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); default: throw SimpleSqlAggregator.badTypeException(fieldName, "LATEST", type); @@ -316,6 +344,16 @@ public EarliestLatestReturnTypeInference(int ordinal) public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding) { RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal); + + // If complex and of type SerializablePairLong*, return type + if (type instanceof RowSignatures.ComplexSqlType) { + ColumnType complexColumnType = ((RowSignatures.ComplexSqlType) type).getColumnType(); + String complexTypeName = complexColumnType.getComplexTypeName(); + if (complexTypeName != null && (complexTypeName.equals(SerializablePairLongLongComplexMetricSerde.TYPE_NAME) || complexTypeName.equals(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME) || complexTypeName.equals(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME))) { + return type; + } + } + // For non-number and non-string type, which is COMPLEX type, we set the return type to VARCHAR. if (!SqlTypeUtil.isNumeric(type) && !SqlTypeUtil.isString(type)) { diff --git a/web-console/src/druid-models/metric-spec/metric-spec.tsx b/web-console/src/druid-models/metric-spec/metric-spec.tsx index ee689880d525..ad560b7a99e2 100644 --- a/web-console/src/druid-models/metric-spec/metric-spec.tsx +++ b/web-console/src/druid-models/metric-spec/metric-spec.tsx @@ -97,10 +97,14 @@ export const METRIC_SPEC_FIELDS: Field[] = [ group: 'max', suggestions: ['longMax', 'doubleMax', 'floatMax'], }, - // Do not show first and last aggregators as they can not be used in ingestion specs and this definition is only used in the data loader. - // Ref: https://druid.apache.org/docs/latest/querying/aggregations.html#first--last-aggregator - // Should the first / last aggregators become usable at ingestion time, reverse the changes made in: - // https://github.com/apache/druid/pull/10794 + { + group: 'first', + suggestions: ['longFirst', 'doubleFirst', 'floatFirst'], + }, + { + group: 'last', + suggestions: ['longLast', 'doubleLast', 'floatLast'], + }, 'thetaSketch', 'arrayOfDoublesSketch', { From 18a479fb6f997f14e7c9af4e3ead7cccf238288f Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Thu, 28 Dec 2023 20:44:08 -0800 Subject: [PATCH 2/5] update test cases --- .../sql/avatica/DruidAvaticaHandlerTest.java | 16 ++++- .../druid/sql/calcite/CalciteQueryTest.java | 36 +++++++++- .../sql/calcite/CalciteSimpleQueryTest.java | 2 +- .../druid/sql/calcite/util/CalciteTests.java | 2 + .../sql/calcite/util/TestDataBuilder.java | 67 ++++++++++++++++++- 5 files changed, 117 insertions(+), 6 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 90e75e10f9ca..193ba0c7f1b1 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -581,7 +581,13 @@ public void testDatabaseMetaDataTables() throws SQLException ), row( Pair.of("TABLE_CAT", "druid"), - Pair.of("TABLE_NAME", "wikipedia"), + Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") + ), + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST), Pair.of("TABLE_SCHEM", "druid"), Pair.of("TABLE_TYPE", "TABLE") ) @@ -661,7 +667,13 @@ public void testDatabaseMetaDataTablesAsSuperuser() throws SQLException ), row( Pair.of("TABLE_CAT", "druid"), - Pair.of("TABLE_NAME", "wikipedia"), + Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") + ), + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST), Pair.of("TABLE_SCHEM", "druid"), Pair.of("TABLE_TYPE", "TABLE") ) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 52d8bf715eeb..f4c4ff8b70e4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -179,7 +179,8 @@ public void testInformationSchemaTables() .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"}) - .add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) @@ -217,7 +218,8 @@ public void testInformationSchemaTables() .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"}) - .add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) @@ -1075,6 +1077,36 @@ public void testStringLatestGroupBy() ); } + @Test + public void testNumericLatestEarliestGroupBy() + { + testQuery( + "SELECT isNew, LATEST(long_last_added), EARLIEST(long_first_added), LATEST(float_last_added), EARLIEST(float_first_added), LATEST(double_last_added), EARLIEST(double_first_added) FROM wikipedia_first_last GROUP BY isNew", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("isNew", "d0"))) + .setAggregatorSpecs(aggregators( + new LongLastAggregatorFactory("a0", "long_last_added", null), + new LongFirstAggregatorFactory("a1", "long_first_added", null), + new FloatLastAggregatorFactory("a2", "float_last_added", null), + new FloatFirstAggregatorFactory("a3", "float_first_added", null), + new DoubleLastAggregatorFactory("a4", "double_last_added", null), + new DoubleFirstAggregatorFactory("a5", "double_first_added", null) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"false", "182", "36", "182.0", "36.0", "182.0", "36.0"}, + new Object[]{"true", "113", "345", "113.0", "345.0", "113.0", "345.0"} + ) + ); + } + @Test public void testStringLatestGroupByWithAlwaysFalseCondition() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java index 1eb6d58bcb87..5dee57dac32b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java @@ -647,7 +647,7 @@ public void testEarliestByLatestByWithExpression() .expectedQueries( ImmutableList.of( GroupByQuery.builder() - .setDataSource("wikipedia") + .setDataSource(CalciteTests.WIKIPEDIA) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setVirtualColumns( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index af723c3b27bf..a9ca48e90a9b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -117,6 +117,8 @@ public class CalciteTests public static final String SOMEXDATASOURCE = "somexdatasource"; public static final String USERVISITDATASOURCE = "visits"; public static final String DRUID_SCHEMA_NAME = "druid"; + public static final String WIKIPEDIA = "wikipedia"; + public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last"; public static final String TEST_SUPERUSER_NAME = "testSuperuser"; public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new AuthorizerMapper(null) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java index 2d3d681220f3..53480e2532d9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java @@ -47,7 +47,12 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; @@ -639,6 +644,57 @@ public static QueryableIndex makeWikipediaIndex(File tmpDir) .buildMMappedIndex(); } + public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir) + { + final List dimensions = Arrays.asList( + new StringDimensionSchema("channel"), + new StringDimensionSchema("cityName"), + new StringDimensionSchema("comment"), + new StringDimensionSchema("countryIsoCode"), + new StringDimensionSchema("countryName"), + new StringDimensionSchema("isAnonymous"), + new StringDimensionSchema("isMinor"), + new StringDimensionSchema("isNew"), + new StringDimensionSchema("isRobot"), + new StringDimensionSchema("isUnpatrolled"), + new StringDimensionSchema("metroCode"), + new StringDimensionSchema("namespace"), + new StringDimensionSchema("page"), + new StringDimensionSchema("regionIsoCode"), + new StringDimensionSchema("regionName"), + new StringDimensionSchema("user") + ); + + return IndexBuilder + .create() + .tmpDir(new File(tmpDir, "wikipedia1")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(new IncrementalIndexSchema.Builder() + .withRollup(true) + .withTimestampSpec(new TimestampSpec("time", null, null)) + .withDimensionsSpec(new DimensionsSpec(dimensions)) + .withMetrics( + new LongLastAggregatorFactory("long_last_added", "added", "__time"), + new LongFirstAggregatorFactory("long_first_added", "added", "__time"), + new FloatLastAggregatorFactory("float_last_added", "added", "__time"), + new FloatLastAggregatorFactory("float_first_added", "added", "__time"), + new DoubleLastAggregatorFactory("double_last_added", "added", "__time"), + new DoubleFirstAggregatorFactory("double_first_added", "added", "__time") + + ) + .build() + ) + .inputSource( + ResourceInputSource.of( + TestDataBuilder.class.getClassLoader(), + "calcite/tests/wikiticker-2015-09-12-sampled.json.gz" + ) + ) + .inputFormat(DEFAULT_JSON_INPUT_FORMAT) + .inputTmpDir(new File(tmpDir, "tmpWikipedia1")) + .buildMMappedIndex(); + } + public static SpecificSegmentsQuerySegmentWalker createMockWalker( final Injector injector, final QueryRunnerFactoryConglomerate conglomerate, @@ -873,13 +929,22 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( userVisitIndex ).add( DataSegment.builder() - .dataSource("wikipedia") + .dataSource(CalciteTests.WIKIPEDIA) .interval(Intervals.of("2015-09-12/2015-09-13")) .version("1") .shardSpec(new NumberedShardSpec(0, 0)) .size(0) .build(), makeWikipediaIndex(tmpDir) + ).add( + DataSegment.builder() + .dataSource(CalciteTests.WIKIPEDIA_FIRST_LAST) + .interval(Intervals.of("2015-09-12/2015-09-13")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 0)) + .size(0) + .build(), + makeWikipediaIndexWithAggregation(tmpDir) ); } From fda6b2650effb796c0092bacae05d7fdbfd02845 Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Fri, 29 Dec 2023 10:01:45 -0800 Subject: [PATCH 3/5] update test cases --- .../java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 5b49c649cc0c..f6297b28c64b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -311,6 +311,9 @@ private static Supplier> getSupplierForSegment(SegmentId .inputTmpDir(temporaryFolder.newFolder()) .buildMMappedIndex(); break; + case CalciteTests.WIKIPEDIA_FIRST_LAST: + index = TestDataBuilder.makeWikipediaIndexWithAggregation(temporaryFolder.newFolder()); + break; default: throw new ISE("Cannot query segment %s in test runner", segmentId); From de6e912bbf09ce22dd986ebfb043f34e8848fa2f Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Fri, 29 Dec 2023 10:34:32 -0800 Subject: [PATCH 4/5] update metric-spec.tsx to support string first/last agg --- .../druid-models/metric-spec/metric-spec.tsx | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/web-console/src/druid-models/metric-spec/metric-spec.tsx b/web-console/src/druid-models/metric-spec/metric-spec.tsx index ad560b7a99e2..99cf11aaed4d 100644 --- a/web-console/src/druid-models/metric-spec/metric-spec.tsx +++ b/web-console/src/druid-models/metric-spec/metric-spec.tsx @@ -59,6 +59,12 @@ const KNOWN_TYPES = [ 'longMax', 'doubleMax', 'floatMax', + 'longFirst', + 'longLast', + 'doubleFirst', + 'doubleLast', + 'floatFirst', + 'floatLast', 'stringFirst', 'stringLast', 'thetaSketch', @@ -99,11 +105,11 @@ export const METRIC_SPEC_FIELDS: Field[] = [ }, { group: 'first', - suggestions: ['longFirst', 'doubleFirst', 'floatFirst'], + suggestions: ['longFirst', 'doubleFirst', 'floatFirst', 'stringFirst'], }, { group: 'last', - suggestions: ['longLast', 'doubleLast', 'floatLast'], + suggestions: ['longLast', 'doubleLast', 'floatLast', 'stringLast'], }, 'thetaSketch', 'arrayOfDoublesSketch', @@ -133,6 +139,14 @@ export const METRIC_SPEC_FIELDS: Field[] = [ 'longMax', 'doubleMax', 'floatMax', + 'longFirst', + 'longLast', + 'doubleFirst', + 'doubleLast', + 'floatFirst', + 'floatLast', + 'stringFirst', + 'stringLast', 'thetaSketch', 'arrayOfDoublesSketch', 'HLLSketchBuild', From b5294a3f0f27ae536be4f0062503a8dffab57a2d Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Tue, 9 Jan 2024 14:28:25 -0800 Subject: [PATCH 5/5] addressing comments + add more test case --- .../EarliestLatestAnySqlAggregator.java | 39 ++++++----------- .../druid/sql/calcite/CalciteQueryTest.java | 43 ++++++++++++++++++- 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index f6aa79cafa02..66bbdf8a49bf 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -112,18 +112,6 @@ AggregatorFactory createAggregatorFactory( return new DoubleFirstAggregatorFactory(name, fieldName, timeColumn); case STRING: case COMPLEX: - if (type.getComplexTypeName() != null) { - switch (type.getComplexTypeName()) { - case SerializablePairLongLongComplexMetricSerde.TYPE_NAME: - return new LongFirstAggregatorFactory(name, fieldName, timeColumn); - case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME: - return new FloatFirstAggregatorFactory(name, fieldName, timeColumn); - case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME: - return new DoubleFirstAggregatorFactory(name, fieldName, timeColumn); - default: - return new StringFirstAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); - } - } return new StringFirstAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); default: throw SimpleSqlAggregator.badTypeException(fieldName, "EARLIEST", type); @@ -151,18 +139,6 @@ AggregatorFactory createAggregatorFactory( return new DoubleLastAggregatorFactory(name, fieldName, timeColumn); case STRING: case COMPLEX: - if (type.getComplexTypeName() != null) { - switch (type.getComplexTypeName()) { - case SerializablePairLongLongComplexMetricSerde.TYPE_NAME: - return new LongLastAggregatorFactory(name, fieldName, timeColumn); - case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME: - return new FloatLastAggregatorFactory(name, fieldName, timeColumn); - case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME: - return new DoubleLastAggregatorFactory(name, fieldName, timeColumn); - default: - return new StringLastAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); - } - } return new StringLastAggregatorFactory(name, fieldName, timeColumn, maxStringBytes); default: throw SimpleSqlAggregator.badTypeException(fieldName, "LATEST", type); @@ -345,12 +321,21 @@ public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding) { RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal); - // If complex and of type SerializablePairLong*, return type + // If complex and of type SerializablePairLong*, return scalar type if (type instanceof RowSignatures.ComplexSqlType) { ColumnType complexColumnType = ((RowSignatures.ComplexSqlType) type).getColumnType(); String complexTypeName = complexColumnType.getComplexTypeName(); - if (complexTypeName != null && (complexTypeName.equals(SerializablePairLongLongComplexMetricSerde.TYPE_NAME) || complexTypeName.equals(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME) || complexTypeName.equals(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME))) { - return type; + if (complexTypeName != null) { + switch (complexTypeName) { + case SerializablePairLongLongComplexMetricSerde.TYPE_NAME: + return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.BIGINT); + case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME: + return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.FLOAT); + case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME: + return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.DOUBLE); + default: + return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR); + } } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index f4c4ff8b70e4..b391cb13cfe8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1101,8 +1101,47 @@ public void testNumericLatestEarliestGroupBy() .build() ), ImmutableList.of( - new Object[]{"false", "182", "36", "182.0", "36.0", "182.0", "36.0"}, - new Object[]{"true", "113", "345", "113.0", "345.0", "113.0", "345.0"} + new Object[]{"false", 182L, 36L, 182.0F, 36.0F, 182.0D, 36.0D}, + new Object[]{"true", 113L, 345L, 113.0F, 345.0F, 113.0D, 345.0D} + ) + ); + } + + @Test + public void testNumericLatestEarliestWithOpratorsGroupBy() + { + testQuery( + "SELECT isNew, LATEST(long_last_added)+4, EARLIEST(long_first_added)-4, LATEST(float_last_added)*2, EARLIEST(float_first_added)/2f, LATEST(double_last_added)+2.5, EARLIEST(double_first_added)-2.5 FROM wikipedia_first_last GROUP BY isNew", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("isNew", "d0"))) + .setAggregatorSpecs(aggregators( + new LongLastAggregatorFactory("a0", "long_last_added", null), + new LongFirstAggregatorFactory("a1", "long_first_added", null), + new FloatLastAggregatorFactory("a2", "float_last_added", null), + new FloatFirstAggregatorFactory("a3", "float_first_added", null), + new DoubleLastAggregatorFactory("a4", "double_last_added", null), + new DoubleFirstAggregatorFactory("a5", "double_first_added", null) + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "(\"a0\" + 4)", ColumnType.LONG), + expressionPostAgg("p1", "(\"a1\" - 4)", ColumnType.LONG), + expressionPostAgg("p2", "(\"a2\" * 2)", ColumnType.FLOAT), + expressionPostAgg("p3", "(\"a3\" / 2)", ColumnType.FLOAT), + expressionPostAgg("p4", "(\"a4\" + 2.5)", ColumnType.DOUBLE), + expressionPostAgg("p5", "(\"a5\" - 2.5)", ColumnType.DOUBLE) + ) + + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"false", 186L, 32L, 364.0F, 18.0F, 184.5D, 33.5D}, + new Object[]{"true", 117L, 341L, 226.0F, 172.5F, 115.5D, 342.5D} ) ); }