Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sql + ingestion compatibility for first/last on numeric values #15607

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ private static Supplier<ResourceHolder<Segment>> getSupplierForSegment(SegmentId
.inputTmpDir(temporaryFolder.newFolder())
.buildMMappedIndex();
break;
case CalciteTests.WIKIPEDIA_FIRST_LAST:
index = TestDataBuilder.makeWikipediaIndexWithAggregation(temporaryFolder.newFolder());
break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde;
import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde;
import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
Expand All @@ -68,6 +71,7 @@
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignatures;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand Down Expand Up @@ -108,6 +112,18 @@ AggregatorFactory createAggregatorFactory(
return new DoubleFirstAggregatorFactory(name, fieldName, timeColumn);
case STRING:
case COMPLEX:
if (type.getComplexTypeName() != null) {
switch (type.getComplexTypeName()) {
case SerializablePairLongLongComplexMetricSerde.TYPE_NAME:
return new LongFirstAggregatorFactory(name, fieldName, timeColumn);
case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME:
return new FloatFirstAggregatorFactory(name, fieldName, timeColumn);
case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME:
return new DoubleFirstAggregatorFactory(name, fieldName, timeColumn);
default:
return new StringFirstAggregatorFactory(name, fieldName, timeColumn, maxStringBytes);
}
}
return new StringFirstAggregatorFactory(name, fieldName, timeColumn, maxStringBytes);
default:
throw SimpleSqlAggregator.badTypeException(fieldName, "EARLIEST", type);
Expand Down Expand Up @@ -135,6 +151,18 @@ AggregatorFactory createAggregatorFactory(
return new DoubleLastAggregatorFactory(name, fieldName, timeColumn);
case STRING:
case COMPLEX:
if (type.getComplexTypeName() != null) {
switch (type.getComplexTypeName()) {
case SerializablePairLongLongComplexMetricSerde.TYPE_NAME:
return new LongLastAggregatorFactory(name, fieldName, timeColumn);
case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME:
return new FloatLastAggregatorFactory(name, fieldName, timeColumn);
case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME:
return new DoubleLastAggregatorFactory(name, fieldName, timeColumn);
default:
return new StringLastAggregatorFactory(name, fieldName, timeColumn, maxStringBytes);
}
}
return new StringLastAggregatorFactory(name, fieldName, timeColumn, maxStringBytes);
default:
throw SimpleSqlAggregator.badTypeException(fieldName, "LATEST", type);
Expand Down Expand Up @@ -316,6 +344,16 @@ public EarliestLatestReturnTypeInference(int ordinal)
public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
{
RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal);

// If complex and of type SerializablePairLong*, return type
if (type instanceof RowSignatures.ComplexSqlType) {
ColumnType complexColumnType = ((RowSignatures.ComplexSqlType) type).getColumnType();
String complexTypeName = complexColumnType.getComplexTypeName();
if (complexTypeName != null && (complexTypeName.equals(SerializablePairLongLongComplexMetricSerde.TYPE_NAME) || complexTypeName.equals(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME) || complexTypeName.equals(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? Shouldn't we be returning the scalar return type then? Why are we not doing this for string as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return the same for String but since its return type is already string which is handled in the below if condition, I dint bother changing this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT LATEST_BY("last_float_added", "__time") + 4.0 FROM "wikiticker_long_string_last_first"

This doesn't work because the return type is incorrect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the input type is complex, it should return the scalar value for that complex type (float for longFloatPair etc), else if the input is numeric, it should return that numeric type, else it should return varchar (in unknown complex, string and any other sql type cases)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this @LakshSingla
updated the PR

return type;
}
}

// For non-number and non-string type, which is COMPLEX type, we set the return type to VARCHAR.
if (!SqlTypeUtil.isNumeric(type) &&
!SqlTypeUtil.isString(type)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,13 @@ public void testDatabaseMetaDataTables() throws SQLException
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "wikipedia"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
Expand Down Expand Up @@ -661,7 +667,13 @@ public void testDatabaseMetaDataTablesAsSuperuser() throws SQLException
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "wikipedia"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public void testInformationSchemaTables()
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
Expand Down Expand Up @@ -217,7 +218,8 @@ public void testInformationSchemaTables()
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", "wikipedia", "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
Expand Down Expand Up @@ -1075,6 +1077,36 @@ public void testStringLatestGroupBy()
);
}

@Test
public void testNumericLatestEarliestGroupBy()
{
testQuery(
"SELECT isNew, LATEST(long_last_added), EARLIEST(long_first_added), LATEST(float_last_added), EARLIEST(float_first_added), LATEST(double_last_added), EARLIEST(double_first_added) FROM wikipedia_first_last GROUP BY isNew",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("isNew", "d0")))
.setAggregatorSpecs(aggregators(
new LongLastAggregatorFactory("a0", "long_last_added", null),
new LongFirstAggregatorFactory("a1", "long_first_added", null),
new FloatLastAggregatorFactory("a2", "float_last_added", null),
new FloatFirstAggregatorFactory("a3", "float_first_added", null),
new DoubleLastAggregatorFactory("a4", "double_last_added", null),
new DoubleFirstAggregatorFactory("a5", "double_first_added", null)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"false", "182", "36", "182.0", "36.0", "182.0", "36.0"},
new Object[]{"true", "113", "345", "113.0", "345.0", "113.0", "345.0"}
)
);
}

@Test
public void testStringLatestGroupByWithAlwaysFalseCondition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public void testEarliestByLatestByWithExpression()
.expectedQueries(
ImmutableList.of(
GroupByQuery.builder()
.setDataSource("wikipedia")
.setDataSource(CalciteTests.WIKIPEDIA)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class CalciteTests
public static final String SOMEXDATASOURCE = "somexdatasource";
public static final String USERVISITDATASOURCE = "visits";
public static final String DRUID_SCHEMA_NAME = "druid";
public static final String WIKIPEDIA = "wikipedia";
public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last";

public static final String TEST_SUPERUSER_NAME = "testSuperuser";
public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new AuthorizerMapper(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -639,6 +644,57 @@ public static QueryableIndex makeWikipediaIndex(File tmpDir)
.buildMMappedIndex();
}

public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir)
{
final List<DimensionSchema> dimensions = Arrays.asList(
new StringDimensionSchema("channel"),
new StringDimensionSchema("cityName"),
new StringDimensionSchema("comment"),
new StringDimensionSchema("countryIsoCode"),
new StringDimensionSchema("countryName"),
new StringDimensionSchema("isAnonymous"),
new StringDimensionSchema("isMinor"),
new StringDimensionSchema("isNew"),
new StringDimensionSchema("isRobot"),
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
);

return IndexBuilder
.create()
.tmpDir(new File(tmpDir, "wikipedia1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(new IncrementalIndexSchema.Builder()
.withRollup(true)
.withTimestampSpec(new TimestampSpec("time", null, null))
.withDimensionsSpec(new DimensionsSpec(dimensions))
.withMetrics(
new LongLastAggregatorFactory("long_last_added", "added", "__time"),
new LongFirstAggregatorFactory("long_first_added", "added", "__time"),
new FloatLastAggregatorFactory("float_last_added", "added", "__time"),
new FloatLastAggregatorFactory("float_first_added", "added", "__time"),
new DoubleLastAggregatorFactory("double_last_added", "added", "__time"),
new DoubleFirstAggregatorFactory("double_first_added", "added", "__time")

)
.build()
)
.inputSource(
ResourceInputSource.of(
TestDataBuilder.class.getClassLoader(),
"calcite/tests/wikiticker-2015-09-12-sampled.json.gz"
)
)
.inputFormat(DEFAULT_JSON_INPUT_FORMAT)
.inputTmpDir(new File(tmpDir, "tmpWikipedia1"))
.buildMMappedIndex();
}

public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final Injector injector,
final QueryRunnerFactoryConglomerate conglomerate,
Expand Down Expand Up @@ -873,13 +929,22 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker(
userVisitIndex
).add(
DataSegment.builder()
.dataSource("wikipedia")
.dataSource(CalciteTests.WIKIPEDIA)
.interval(Intervals.of("2015-09-12/2015-09-13"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
makeWikipediaIndex(tmpDir)
).add(
DataSegment.builder()
.dataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
.interval(Intervals.of("2015-09-12/2015-09-13"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
makeWikipediaIndexWithAggregation(tmpDir)
);
}

Expand Down
26 changes: 22 additions & 4 deletions web-console/src/druid-models/metric-spec/metric-spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ const KNOWN_TYPES = [
'longMax',
'doubleMax',
'floatMax',
'longFirst',
'longLast',
'doubleFirst',
'doubleLast',
'floatFirst',
'floatLast',
'stringFirst',
'stringLast',
'thetaSketch',
Expand Down Expand Up @@ -97,10 +103,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
group: 'max',
suggestions: ['longMax', 'doubleMax', 'floatMax'],
},
// Do not show first and last aggregators as they can not be used in ingestion specs and this definition is only used in the data loader.
// Ref: https://druid.apache.org/docs/latest/querying/aggregations.html#first--last-aggregator
// Should the first / last aggregators become usable at ingestion time, reverse the changes made in:
// https://github.com/apache/druid/pull/10794
{
group: 'first',
suggestions: ['longFirst', 'doubleFirst', 'floatFirst', 'stringFirst'],
},
{
group: 'last',
suggestions: ['longLast', 'doubleLast', 'floatLast', 'stringLast'],
},
'thetaSketch',
'arrayOfDoublesSketch',
{
Expand Down Expand Up @@ -129,6 +139,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
'longMax',
'doubleMax',
'floatMax',
'longFirst',
'longLast',
'doubleFirst',
'doubleLast',
'floatFirst',
'floatLast',
'stringFirst',
'stringLast',
Comment on lines +142 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this change isn't required. KNOWN_TYPES already contains the first/last aggregations.

'thetaSketch',
'arrayOfDoublesSketch',
'HLLSketchBuild',
Expand Down