Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sql + ingestion compatibility for first/last on numeric values #15607

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ private static Supplier<ResourceHolder<Segment>> getSupplierForSegment(SegmentId
.inputTmpDir(temporaryFolder.newFolder())
.buildMMappedIndex();
break;
case CalciteTests.WIKIPEDIA_FIRST_LAST:
index = TestDataBuilder.makeWikipediaIndexWithAggregation(temporaryFolder.newFolder());
break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde;
import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde;
import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
Expand All @@ -68,6 +71,7 @@
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignatures;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand Down Expand Up @@ -316,6 +320,25 @@ public EarliestLatestReturnTypeInference(int ordinal)
public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
{
RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal);

// If complex and of type SerializablePairLong*, return scalar type
if (type instanceof RowSignatures.ComplexSqlType) {
ColumnType complexColumnType = ((RowSignatures.ComplexSqlType) type).getColumnType();
String complexTypeName = complexColumnType.getComplexTypeName();
if (complexTypeName != null) {
switch (complexTypeName) {
case SerializablePairLongLongComplexMetricSerde.TYPE_NAME:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.FLOAT);
case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
default:
return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
}
}
}

// For non-number and non-string type, which is COMPLEX type, we set the return type to VARCHAR.
if (!SqlTypeUtil.isNumeric(type) &&
!SqlTypeUtil.isString(type)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,13 @@ public void testDatabaseMetaDataTables() throws SQLException
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "wikipedia"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
Expand Down Expand Up @@ -661,7 +667,13 @@ public void testDatabaseMetaDataTablesAsSuperuser() throws SQLException
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "wikipedia"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public void testInformationSchemaTables()
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
Expand Down Expand Up @@ -217,7 +218,8 @@ public void testInformationSchemaTables()
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
Expand Down Expand Up @@ -1075,6 +1077,75 @@ public void testStringLatestGroupBy()
);
}

@Test
public void testNumericLatestEarliestGroupBy()
{
testQuery(
"SELECT isNew, LATEST(long_last_added), EARLIEST(long_first_added), LATEST(float_last_added), EARLIEST(float_first_added), LATEST(double_last_added), EARLIEST(double_first_added) FROM wikipedia_first_last GROUP BY isNew",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("isNew", "d0")))
.setAggregatorSpecs(aggregators(
new LongLastAggregatorFactory("a0", "long_last_added", null),
new LongFirstAggregatorFactory("a1", "long_first_added", null),
new FloatLastAggregatorFactory("a2", "float_last_added", null),
new FloatFirstAggregatorFactory("a3", "float_first_added", null),
new DoubleLastAggregatorFactory("a4", "double_last_added", null),
new DoubleFirstAggregatorFactory("a5", "double_first_added", null)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"false", 182L, 36L, 182.0F, 36.0F, 182.0D, 36.0D},
new Object[]{"true", 113L, 345L, 113.0F, 345.0F, 113.0D, 345.0D}
)
);
}

@Test
public void testNumericLatestEarliestWithOpratorsGroupBy()
{
testQuery(
"SELECT isNew, LATEST(long_last_added)+4, EARLIEST(long_first_added)-4, LATEST(float_last_added)*2, EARLIEST(float_first_added)/2f, LATEST(double_last_added)+2.5, EARLIEST(double_first_added)-2.5 FROM wikipedia_first_last GROUP BY isNew",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("isNew", "d0")))
.setAggregatorSpecs(aggregators(
new LongLastAggregatorFactory("a0", "long_last_added", null),
new LongFirstAggregatorFactory("a1", "long_first_added", null),
new FloatLastAggregatorFactory("a2", "float_last_added", null),
new FloatFirstAggregatorFactory("a3", "float_first_added", null),
new DoubleLastAggregatorFactory("a4", "double_last_added", null),
new DoubleFirstAggregatorFactory("a5", "double_first_added", null)
)
)
.setPostAggregatorSpecs(
expressionPostAgg("p0", "(\"a0\" + 4)", ColumnType.LONG),
expressionPostAgg("p1", "(\"a1\" - 4)", ColumnType.LONG),
expressionPostAgg("p2", "(\"a2\" * 2)", ColumnType.FLOAT),
expressionPostAgg("p3", "(\"a3\" / 2)", ColumnType.FLOAT),
expressionPostAgg("p4", "(\"a4\" + 2.5)", ColumnType.DOUBLE),
expressionPostAgg("p5", "(\"a5\" - 2.5)", ColumnType.DOUBLE)
)

.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"false", 186L, 32L, 364.0F, 18.0F, 184.5D, 33.5D},
new Object[]{"true", 117L, 341L, 226.0F, 172.5F, 115.5D, 342.5D}
)
);
}

@Test
public void testStringLatestGroupByWithAlwaysFalseCondition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public void testEarliestByLatestByWithExpression()
.expectedQueries(
ImmutableList.of(
GroupByQuery.builder()
.setDataSource("wikipedia")
.setDataSource(CalciteTests.WIKIPEDIA)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class CalciteTests
public static final String SOMEXDATASOURCE = "somexdatasource";
public static final String USERVISITDATASOURCE = "visits";
public static final String DRUID_SCHEMA_NAME = "druid";
public static final String WIKIPEDIA = "wikipedia";
public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last";

public static final String TEST_SUPERUSER_NAME = "testSuperuser";
public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new AuthorizerMapper(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -639,6 +644,57 @@ public static QueryableIndex makeWikipediaIndex(File tmpDir)
.buildMMappedIndex();
}

public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir)
{
final List<DimensionSchema> dimensions = Arrays.asList(
new StringDimensionSchema("channel"),
new StringDimensionSchema("cityName"),
new StringDimensionSchema("comment"),
new StringDimensionSchema("countryIsoCode"),
new StringDimensionSchema("countryName"),
new StringDimensionSchema("isAnonymous"),
new StringDimensionSchema("isMinor"),
new StringDimensionSchema("isNew"),
new StringDimensionSchema("isRobot"),
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
);

return IndexBuilder
.create()
.tmpDir(new File(tmpDir, "wikipedia1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(new IncrementalIndexSchema.Builder()
.withRollup(true)
.withTimestampSpec(new TimestampSpec("time", null, null))
.withDimensionsSpec(new DimensionsSpec(dimensions))
.withMetrics(
new LongLastAggregatorFactory("long_last_added", "added", "__time"),
new LongFirstAggregatorFactory("long_first_added", "added", "__time"),
new FloatLastAggregatorFactory("float_last_added", "added", "__time"),
new FloatLastAggregatorFactory("float_first_added", "added", "__time"),
new DoubleLastAggregatorFactory("double_last_added", "added", "__time"),
new DoubleFirstAggregatorFactory("double_first_added", "added", "__time")

)
.build()
)
.inputSource(
ResourceInputSource.of(
TestDataBuilder.class.getClassLoader(),
"calcite/tests/wikiticker-2015-09-12-sampled.json.gz"
)
)
.inputFormat(DEFAULT_JSON_INPUT_FORMAT)
.inputTmpDir(new File(tmpDir, "tmpWikipedia1"))
.buildMMappedIndex();
}

public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final Injector injector,
final QueryRunnerFactoryConglomerate conglomerate,
Expand Down Expand Up @@ -873,13 +929,22 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker(
userVisitIndex
).add(
DataSegment.builder()
.dataSource("wikipedia")
.dataSource(CalciteTests.WIKIPEDIA)
.interval(Intervals.of("2015-09-12/2015-09-13"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
makeWikipediaIndex(tmpDir)
).add(
DataSegment.builder()
.dataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.interval(Intervals.of("2015-09-12/2015-09-13"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
makeWikipediaIndexWithAggregation(tmpDir)
);
}

Expand Down
26 changes: 22 additions & 4 deletions web-console/src/druid-models/metric-spec/metric-spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ const KNOWN_TYPES = [
'longMax',
'doubleMax',
'floatMax',
'longFirst',
'longLast',
'doubleFirst',
'doubleLast',
'floatFirst',
'floatLast',
'stringFirst',
'stringLast',
'thetaSketch',
Expand Down Expand Up @@ -97,10 +103,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
group: 'max',
suggestions: ['longMax', 'doubleMax', 'floatMax'],
},
// Do not show first and last aggregators as they can not be used in ingestion specs and this definition is only used in the data loader.
// Ref: https://druid.apache.org/docs/latest/querying/aggregations.html#first--last-aggregator
// Should the first / last aggregators become usable at ingestion time, reverse the changes made in:
// https://github.com/apache/druid/pull/10794
{
group: 'first',
suggestions: ['longFirst', 'doubleFirst', 'floatFirst', 'stringFirst'],
},
{
group: 'last',
suggestions: ['longLast', 'doubleLast', 'floatLast', 'stringLast'],
},
'thetaSketch',
'arrayOfDoublesSketch',
{
Expand Down Expand Up @@ -129,6 +139,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
'longMax',
'doubleMax',
'floatMax',
'longFirst',
'longLast',
'doubleFirst',
'doubleLast',
'floatFirst',
'floatLast',
'stringFirst',
'stringLast',
Comment on lines +142 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this change isn't required. KNOWN_TYPES already contains the first/last aggregations.

'thetaSketch',
'arrayOfDoublesSketch',
'HLLSketchBuild',
Expand Down
Loading