From 7a39e3eb6d23c94d57f15fe20b7307a6bcc494e6 Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Wed, 21 Jun 2023 14:32:44 -0700 Subject: [PATCH] Add support for first/last for double/float/long #10702 --- .../wikipedia_merge_index_queries.json | 38 ++++++- .../indexer/wikipedia_merge_index_task.json | 30 +++++ ...merge_reindex_druid_input_source_task.json | 30 +++++ .../indexer/wikipedia_merge_reindex_task.json | 30 +++++ .../druid/jackson/AggregatorsModule.java | 7 ++ ...stractSerializableLongObjectPairSerde.java | 82 ++++++++++++++ .../SerializablePairLongDouble.java | 35 ++++++ ...zablePairLongDoubleComplexMetricSerde.java | 99 ++++++++++++++++ .../SerializablePairLongFloat.java | 35 ++++++ ...izablePairLongFloatComplexMetricSerde.java | 99 ++++++++++++++++ .../aggregation/SerializablePairLongLong.java | 35 ++++++ ...lizablePairLongLongComplexMetricSerde.java | 99 ++++++++++++++++ .../first/DoubleFirstAggregator.java | 20 ++-- .../first/DoubleFirstAggregatorFactory.java | 107 +++++------------- .../first/DoubleFirstBufferAggregator.java | 23 ++-- .../first/FloatFirstAggregator.java | 21 ++-- .../first/FloatFirstAggregatorFactory.java | 105 +++++------------ .../first/FloatFirstBufferAggregator.java | 23 ++-- .../first/GenericFirstAggregateCombiner.java | 66 +++++++++++ .../first/LongFirstAggregator.java | 19 +++- .../first/LongFirstAggregatorFactory.java | 104 +++++------------ .../first/LongFirstBufferAggregator.java | 19 +++- .../first/NumericFirstAggregator.java | 40 ++++++- .../first/NumericFirstBufferAggregator.java | 48 ++++++-- .../first/StringFirstAggregatorFactory.java | 4 +- .../first/StringFirstLastUtils.java | 7 +- .../last/DoubleLastAggregator.java | 20 ++-- .../last/DoubleLastAggregatorFactory.java | 106 +++++------------ .../last/DoubleLastBufferAggregator.java | 21 ++-- .../aggregation/last/FloatLastAggregator.java | 21 ++-- .../last/FloatLastAggregatorFactory.java | 104 +++++------------ .../last/FloatLastBufferAggregator.java | 22 ++-- .../last/GenericLastAggregateCombiner.java | 66 +++++++++++ .../aggregation/last/LongLastAggregator.java | 19 +++- .../last/LongLastAggregatorFactory.java | 105 +++++------------ .../last/LongLastBufferAggregator.java | 19 +++- .../last/NumericLastAggregator.java | 47 ++++++-- .../last/NumericLastBufferAggregator.java | 49 ++++++-- .../last/StringLastAggregatorFactory.java | 4 +- .../aggregation/AggregatorFactoryTest.java | 12 +- .../first/DoubleFirstAggregationTest.java | 53 +++++++-- .../first/FloatFirstAggregationTest.java | 49 ++++++-- .../first/LongFirstAggregationTest.java | 47 ++++++-- .../last/DoubleLastAggregationTest.java | 52 +++++++-- .../last/FloatLastAggregationTest.java | 51 +++++++-- .../last/LongLastAggregationTest.java | 52 +++++++-- .../GroupByQueryQueryToolChestTest.java | 7 +- .../topn/TopNQueryQueryToolChestTest.java | 7 +- .../druid/segment/IndexMergerRollupTest.java | 32 ++++-- 49 files changed, 1536 insertions(+), 654 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDouble.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloat.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLong.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/GenericFirstAggregateCombiner.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/last/GenericLastAggregateCombiner.java diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json index ab4674999b5d2..25ea61496165f 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json @@ -26,6 +26,36 @@ "type":"stringLast", "name":"latest_user", "fieldName":"last_user" + }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "double_first_delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "double_last_delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "long_first_delta" + }, + { + "type": "longFirst", + "name": "long_last_delta", + "fieldName": "long_last_delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "float_first_delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "float_last_delta" } ] }, @@ -35,7 +65,13 @@ "event" : { "continent":"Asia", "earliest_user":"masterYi", - "latest_user":"stringer" + "latest_user":"stringer", + "double_first_delta": 111.0, + "double_last_delta": -9.0, + "long_first_delta": 111, + "long_last_delta": -9, + "float_first_delta": 111.0, + "float_last_delta": -9.0 } } ] } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json index 268a3aef4a85a..0af62eb498846 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json @@ -40,6 +40,36 @@ "type": "stringLast", "name": "last_user", "fieldName": "user" + }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "delta" + }, + { + "type": "longLast", + "name": "long_last_delta", + "fieldName": "delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "delta" } ], "granularitySpec": { diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json index 9daae62c8d42d..348aff886455c 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json @@ -56,6 +56,36 @@ "type": "stringLast", "name": "last_user", "fieldName": "last_user" + }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "double_first_delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "double_last_delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "long_first_delta" + }, + { + "type": "longLast", + "name": "long_last_delta", + "fieldName": "long_last_delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "float_first_delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "float_last_delta" } ] } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json index 040fff005ce78..9c69c825b9e70 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json @@ -37,6 +37,36 @@ "type": "stringLast", "name": "last_user", "fieldName": "last_user" + }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "double_first_delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "double_last_delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "long_first_delta" + }, + { + "type": "longLast", + "name": "long_last_delta", + "fieldName": "long_last_delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "float_first_delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "float_last_delta" } ], "granularitySpec": { diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java index 3130fefb85d3c..93d6afb1dd985 100644 --- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java @@ -39,6 +39,9 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory; @@ -83,6 +86,10 @@ public AggregatorsModule() ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new PreComputedHyperUniquesSerde()); ComplexMetrics.registerSerde(SerializablePairLongStringComplexMetricSerde.TYPE_NAME, new SerializablePairLongStringComplexMetricSerde()); + ComplexMetrics.registerSerde(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME, new SerializablePairLongFloatComplexMetricSerde()); + ComplexMetrics.registerSerde(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME, new SerializablePairLongDoubleComplexMetricSerde()); + ComplexMetrics.registerSerde(SerializablePairLongLongComplexMetricSerde.TYPE_NAME, new SerializablePairLongLongComplexMetricSerde()); + setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java new file mode 100644 index 0000000000000..e754fa8c2f58f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.serde.ComplexColumnPartSupplier; +import org.apache.druid.segment.serde.ComplexMetricExtractor; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public abstract class AbstractSerializableLongObjectPairSerde> extends + ComplexMetricSerde +{ + private final Class pairClass; + + public AbstractSerializableLongObjectPairSerde(Class pairClass) + { + this.pairClass = pairClass; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return pairClass; + } + + @Nullable + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + return inputRow.getMetric(metricName); + } + }; + } + + @Override + public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) + { + final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); + columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column)); + } + + @Override + public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) + { + return LargeColumnSupportedComplexColumnSerializer.create( + segmentWriteOutMedium, + column, + getObjectStrategy() + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDouble.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDouble.java new file mode 100644 index 0000000000000..e811d6ac4f91c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDouble.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.collections.SerializablePair; + +import javax.annotation.Nullable; + +public class SerializablePairLongDouble extends SerializablePair +{ + @JsonCreator + public SerializablePairLongDouble(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") @Nullable Double rhs) + { + super(lhs, rhs); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java new file mode 100644 index 0000000000000..8ebaa2322939c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +public class SerializablePairLongDoubleComplexMetricSerde extends AbstractSerializableLongObjectPairSerde +{ + public static final String TYPE_NAME = "serializablePairLongDouble"; + + private static final Comparator> COMPARATOR = SerializablePair.createNullHandlingComparator( + Double::compare, + true + ); + + public SerializablePairLongDoubleComplexMetricSerde() + { + super(SerializablePairLongDouble.class); + } + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return new ObjectStrategy() + { + @Override + public int compare(SerializablePairLongDouble o1, SerializablePairLongDouble o2) + { + return COMPARATOR.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongDouble.class; + } + + @Override + public SerializablePairLongDouble fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + long lhs = readOnlyBuffer.getLong(); + boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE; + if (isNotNull) { + return new SerializablePairLongDouble(lhs, readOnlyBuffer.getDouble()); + } else { + return new SerializablePairLongDouble(lhs, null); + } + } + + @Override + public byte[] toBytes(@Nullable SerializablePairLongDouble inPair) + { + if (inPair == null) { + return new byte[]{}; + } + + ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Double.BYTES); + bbuf.putLong(inPair.lhs); + if (inPair.rhs == null) { + bbuf.put(NullHandling.IS_NULL_BYTE); + } else { + bbuf.put(NullHandling.IS_NOT_NULL_BYTE); + bbuf.putDouble(inPair.rhs); + } + return bbuf.array(); + } + }; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloat.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloat.java new file mode 100644 index 0000000000000..619f2e1528eca --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloat.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.collections.SerializablePair; + +import javax.annotation.Nullable; + +public class SerializablePairLongFloat extends SerializablePair +{ + @JsonCreator + public SerializablePairLongFloat(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") @Nullable Float rhs) + { + super(lhs, rhs); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java new file mode 100644 index 0000000000000..7578c3d7c52d9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +public class SerializablePairLongFloatComplexMetricSerde extends AbstractSerializableLongObjectPairSerde +{ + public static final String TYPE_NAME = "serializablePairLongFloat"; + + private static final Comparator> COMPARATOR = SerializablePair.createNullHandlingComparator( + Float::compare, + true + ); + + public SerializablePairLongFloatComplexMetricSerde() + { + super(SerializablePairLongFloat.class); + } + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return new ObjectStrategy() + { + @Override + public int compare(SerializablePairLongFloat o1, SerializablePairLongFloat o2) + { + return COMPARATOR.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongFloat.class; + } + + @Override + public SerializablePairLongFloat fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + long lhs = readOnlyBuffer.getLong(); + boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE; + if (isNotNull) { + return new SerializablePairLongFloat(lhs, readOnlyBuffer.getFloat()); + } else { + return new SerializablePairLongFloat(lhs, null); + } + } + + @Override + public byte[] toBytes(@Nullable SerializablePairLongFloat inPair) + { + if (inPair == null) { + return new byte[]{}; + } + + ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Float.BYTES); + bbuf.putLong(inPair.lhs); + if (inPair.rhs == null) { + bbuf.put(NullHandling.IS_NULL_BYTE); + } else { + bbuf.put(NullHandling.IS_NOT_NULL_BYTE); + bbuf.putFloat(inPair.rhs); + } + return bbuf.array(); + } + }; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLong.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLong.java new file mode 100644 index 0000000000000..af06a8c210b4f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLong.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.collections.SerializablePair; + +import javax.annotation.Nullable; + +public class SerializablePairLongLong extends SerializablePair +{ + @JsonCreator + public SerializablePairLongLong(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") @Nullable Long rhs) + { + super(lhs, rhs); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java new file mode 100644 index 0000000000000..37fe5eef183f5 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +public class SerializablePairLongLongComplexMetricSerde extends AbstractSerializableLongObjectPairSerde +{ + public static final String TYPE_NAME = "serializablePairLongLong"; + + private static final Comparator> COMPARATOR = SerializablePair.createNullHandlingComparator( + Long::compare, + true + ); + + public SerializablePairLongLongComplexMetricSerde() + { + super(SerializablePairLongLong.class); + } + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return new ObjectStrategy() + { + @Override + public int compare(SerializablePairLongLong o1, SerializablePairLongLong o2) + { + return COMPARATOR.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongLong.class; + } + + @Override + public SerializablePairLongLong fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + long lhs = readOnlyBuffer.getLong(); + boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE; + if (isNotNull) { + return new SerializablePairLongLong(lhs, readOnlyBuffer.getLong()); + } else { + return new SerializablePairLongLong(lhs, null); + } + } + + @Override + public byte[] toBytes(@Nullable SerializablePairLongLong inPair) + { + if (inPair == null) { + return new byte[]{}; + } + + ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Long.BYTES); + bbuf.putLong(inPair.lhs); + if (inPair.rhs == null) { + bbuf.put(NullHandling.IS_NULL_BYTE); + } else { + bbuf.put(NullHandling.IS_NOT_NULL_BYTE); + bbuf.putLong(inPair.rhs); + } + return bbuf.array(); + } + }; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java index 8b4a89d0d72e1..f713a15de1907 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java @@ -19,30 +19,36 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class DoubleFirstAggregator extends NumericFirstAggregator +public class DoubleFirstAggregator extends NumericFirstAggregator { double firstValue; - public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector) + public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); firstValue = 0; } @Override - void setCurrentValue() + void setFirstValue(ColumnValueSelector valueSelector) { firstValue = valueSelector.getDouble(); } + @Override + void setFirstValue(Number firstValue) + { + this.firstValue = firstValue.doubleValue(); + } + @Override public Object get() { - return new SerializablePair<>(firstTime, rhsNull ? null : firstValue); + return new SerializablePairLongDouble(firstTime, rhsNull ? null : firstValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index d575b263f0fbb..dc042aa2ab939 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -21,17 +21,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -47,11 +47,15 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("doubleFirst") public class DoubleFirstAggregatorFactory extends AggregatorFactory { + public static final ColumnType TYPE = ColumnType.ofComplex(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME); + private static final Aggregator NIL_AGGREGATOR = new DoubleFirstAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -63,7 +67,9 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleFirstBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false + ) { @Override @@ -100,13 +106,18 @@ public DoubleFirstAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; } else { return new DoubleFirstAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) ); } } @@ -114,13 +125,18 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; } else { return new DoubleFirstBufferAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) ); } } @@ -153,74 +169,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("DoubleFirstAggregatorFactory is not supported during ingestion for rollup"); + return new GenericFirstAggregateCombiner(SerializablePairLongDouble.class); } @Override public AggregatorFactory getCombiningFactory() { - return new DoubleFirstAggregatorFactory(name, name, timeColumn) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleFirstAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs < firstTime) { - firstTime = pair.lhs; - if (pair.rhs != null) { - firstValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleFirstBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putDouble(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = (SerializablePair) selector.getObject(); - long firstTime = buf.getLong(position); - if (pair.lhs < firstTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new DoubleFirstAggregatorFactory(name, name, timeColumn); } @Override @@ -234,16 +189,16 @@ public Object deserialize(Object object) { Map map = (Map) object; if (map.get("rhs") == null) { - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null); + return new SerializablePairLongDouble(((Number) map.get("lhs")).longValue(), null); } - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); + return new SerializablePairLongDouble(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); } @Override @Nullable public Object finalizeComputation(@Nullable Object object) { - return object == null ? null : ((SerializablePair) object).rhs; + return object == null ? null : ((SerializablePairLongDouble) object).rhs; } @Override @@ -284,7 +239,7 @@ public byte[] getCacheKey() public ColumnType getIntermediateType() { // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return storeDoubleAsFloat ? ColumnType.FLOAT : ColumnType.DOUBLE; + return TYPE; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java index dabade475369a..c00472e923c9c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java @@ -19,20 +19,21 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class DoubleFirstBufferAggregator extends NumericFirstBufferAggregator +public class DoubleFirstBufferAggregator extends NumericFirstBufferAggregator { public DoubleFirstBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseDoubleColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +43,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSector) { - buf.putDouble(position, valueSelector.getDouble()); + buf.putDouble(position, valueSector.getDouble()); + } + + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putDouble(position, value.doubleValue()); } @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); + return new SerializablePairLongDouble(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java index 2c0f62934ffef..d02bfdb922cdd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java @@ -19,33 +19,40 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class FloatFirstAggregator extends NumericFirstAggregator +public class FloatFirstAggregator extends NumericFirstAggregator { float firstValue; public FloatFirstAggregator( BaseLongColumnValueSelector timeSelector, - BaseFloatColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); firstValue = 0; } @Override - void setCurrentValue() + void setFirstValue(ColumnValueSelector valueSelector) { firstValue = valueSelector.getFloat(); } + @Override + void setFirstValue(Number firstValue) + { + this.firstValue = firstValue.floatValue(); + } + @Override public Object get() { - return new SerializablePair<>(firstTime, rhsNull ? null : firstValue); + return new SerializablePairLongFloat(firstTime, rhsNull ? null : firstValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index be6a0f6aad97b..7884ea4a92804 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -21,17 +21,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; +import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -47,11 +47,15 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("floatFirst") public class FloatFirstAggregatorFactory extends AggregatorFactory { + public static final ColumnType TYPE = ColumnType.ofComplex(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME); + private static final Aggregator NIL_AGGREGATOR = new FloatFirstAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -63,7 +67,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new FloatFirstBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -98,13 +103,18 @@ public FloatFirstAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; } else { return new FloatFirstAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) ); } } @@ -112,13 +122,18 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; } else { return new FloatFirstBufferAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) ); } } @@ -151,73 +166,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("FloatFirstAggregatorFactory is not supported during ingestion for rollup"); + return new GenericFirstAggregateCombiner(SerializablePairLongFloat.class); } @Override public AggregatorFactory getCombiningFactory() { - - return new FloatFirstAggregatorFactory(name, name, timeColumn) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatFirstAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs < firstTime) { - firstTime = pair.lhs; - if (pair.rhs != null) { - firstValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatFirstBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putFloat(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long firstTime = buf.getLong(position); - if (pair.lhs < firstTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new FloatFirstAggregatorFactory(name, name, timeColumn); } @Override @@ -231,16 +186,16 @@ public Object deserialize(Object object) { Map map = (Map) object; if (map.get("rhs") == null) { - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null); + return new SerializablePairLongFloat(((Number) map.get("lhs")).longValue(), null); } - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).floatValue()); + return new SerializablePairLongFloat(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).floatValue()); } @Override @Nullable public Object finalizeComputation(@Nullable Object object) { - return object == null ? null : ((SerializablePair) object).rhs; + return object == null ? null : ((SerializablePairLongFloat) object).rhs; } @Override @@ -281,7 +236,7 @@ public byte[] getCacheKey() public ColumnType getIntermediateType() { // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ColumnType.FLOAT; + return TYPE; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java index cf7d272b0085f..b8881ee9500f0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java @@ -19,20 +19,21 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class FloatFirstBufferAggregator extends NumericFirstBufferAggregator +public class FloatFirstBufferAggregator extends NumericFirstBufferAggregator { public FloatFirstBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseFloatColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +43,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSector) { - buf.putFloat(position, valueSelector.getFloat()); + buf.putFloat(position, valueSector.getFloat()); + } + + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putFloat(position, value.floatValue()); } @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); + return new SerializablePairLongFloat(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/GenericFirstAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/GenericFirstAggregateCombiner.java new file mode 100644 index 0000000000000..cd72c306b798f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/GenericFirstAggregateCombiner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import com.google.common.primitives.Longs; +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class GenericFirstAggregateCombiner> extends ObjectAggregateCombiner +{ + private T firstValue; + private final Class pairClass; + + public GenericFirstAggregateCombiner(Class pairClass) + { + this.pairClass = pairClass; + } + + @Override + public void reset(ColumnValueSelector selector) + { + firstValue = (T) selector.getObject(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + T newValue = (T) selector.getObject(); + if (Longs.compare(firstValue.lhs, newValue.lhs) > 0) { + firstValue = newValue; + } + } + + @Nullable + @Override + public T getObject() + { + return firstValue; + } + + @Override + public Class classOfObject() + { + return pairClass; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java index 8cda544521ea8..af070dbb5e8c1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java @@ -19,29 +19,36 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class LongFirstAggregator extends NumericFirstAggregator +public class LongFirstAggregator extends NumericFirstAggregator { long firstValue; - public LongFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongFirstAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); firstValue = 0; } @Override - void setCurrentValue() + void setFirstValue(ColumnValueSelector valueSelector) { firstValue = valueSelector.getLong(); } + @Override + void setFirstValue(Number firstValue) + { + this.firstValue = firstValue.longValue(); + } + @Override public Object get() { - return new SerializablePair<>(firstTime, rhsNull ? null : firstValue); + return new SerializablePairLongLong(firstTime, rhsNull ? null : firstValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 695d01b3a4a7a..3ecaabf15971a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -21,17 +21,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongLong; +import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -46,11 +46,15 @@ import java.util.List; import java.util.Map; +@JsonTypeName("longFirst") public class LongFirstAggregatorFactory extends AggregatorFactory { + public static final ColumnType TYPE = ColumnType.ofComplex(SerializablePairLongLongComplexMetricSerde.TYPE_NAME); + private static final Aggregator NIL_AGGREGATOR = new LongFirstAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -62,7 +66,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new LongFirstBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -97,13 +102,18 @@ public LongFirstAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; } else { return new LongFirstAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) ); } } @@ -111,13 +121,18 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; } else { return new LongFirstBufferAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) ); } } @@ -150,72 +165,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("LongFirstAggregatorFactory is not supported during ingestion for rollup"); + return new GenericFirstAggregateCombiner(SerializablePairLongLong.class); } @Override public AggregatorFactory getCombiningFactory() { - return new LongFirstAggregatorFactory(name, name, timeColumn) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongFirstAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs < firstTime) { - firstTime = pair.lhs; - if (pair.rhs != null) { - firstValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongFirstBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putLong(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long firstTime = buf.getLong(position); - if (pair.lhs < firstTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new LongFirstAggregatorFactory(name, name, timeColumn); } @Override @@ -229,16 +185,16 @@ public Object deserialize(Object object) { Map map = (Map) object; if (map.get("rhs") == null) { - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null); + return new SerializablePairLongLong(((Number) map.get("lhs")).longValue(), null); } - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); + return new SerializablePairLongLong(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); } @Override @Nullable public Object finalizeComputation(@Nullable Object object) { - return object == null ? null : ((SerializablePair) object).rhs; + return object == null ? null : ((SerializablePairLongLong) object).rhs; } @Override @@ -279,7 +235,7 @@ public byte[] getCacheKey() public ColumnType getIntermediateType() { // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ColumnType.LONG; + return TYPE; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java index 582cda1601531..426d4c64816d9 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java @@ -19,16 +19,17 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class LongFirstBufferAggregator extends NumericFirstBufferAggregator +public class LongFirstBufferAggregator extends NumericFirstBufferAggregator { - public LongFirstBufferAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongFirstBufferAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -38,16 +39,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putLong(position, valueSelector.getLong()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putLong(position, value.longValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); + return new SerializablePairLongLong(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java index c8a537438b013..b7f88f963fed6 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java @@ -19,28 +19,31 @@ package org.apache.druid.query.aggregation.first; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; /** * Base type for on heap 'first' aggregator for primitive numeric column selectors */ -public abstract class NumericFirstAggregator implements Aggregator +public abstract class NumericFirstAggregator implements Aggregator { private final boolean useDefault = NullHandling.replaceWithDefault(); private final BaseLongColumnValueSelector timeSelector; + private final boolean needsFoldCheck; - final TSelector valueSelector; + final ColumnValueSelector valueSelector; long firstTime; boolean rhsNull; - public NumericFirstAggregator(BaseLongColumnValueSelector timeSelector, TSelector valueSelector) + public NumericFirstAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; + this.needsFoldCheck = needsFoldCheck; firstTime = Long.MAX_VALUE; rhsNull = !useDefault; @@ -49,7 +52,12 @@ public NumericFirstAggregator(BaseLongColumnValueSelector timeSelector, TSelecto /** * Store the current primitive typed 'first' value */ - abstract void setCurrentValue(); + abstract void setFirstValue(ColumnValueSelector valueSelector); + + /** + * Store a non-null first value + */ + abstract void setFirstValue(Number firstValue); @Override public void aggregate() @@ -57,13 +65,33 @@ public void aggregate() if (timeSelector.isNull()) { return; } + + if (needsFoldCheck) { + final Object object = valueSelector.getObject(); + if (object instanceof SerializablePair) { + SerializablePair inPair = (SerializablePair) object; + + if (inPair != null && inPair.lhs < firstTime) { + firstTime = inPair.lhs; + if (inPair.rhs == null) { + rhsNull = true; + } else { + rhsNull = false; + setFirstValue(inPair.rhs); + } + } + return; + } + } + long time = timeSelector.getLong(); if (time < firstTime) { firstTime = time; if (useDefault || !valueSelector.isNull()) { - setCurrentValue(); + setFirstValue(valueSelector); rhsNull = false; } else { + setFirstValue(0); rhsNull = true; } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java index 159c6e1317b31..c9911399ed11d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java @@ -19,32 +19,34 @@ package org.apache.druid.query.aggregation.first; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; /** * Base type for buffer based 'first' aggregator for primitive numeric column selectors */ -public abstract class NumericFirstBufferAggregator - implements BufferAggregator +public abstract class NumericFirstBufferAggregator implements BufferAggregator { static final int NULL_OFFSET = Long.BYTES; static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES; private final boolean useDefault = NullHandling.replaceWithDefault(); private final BaseLongColumnValueSelector timeSelector; + private final boolean needsFoldCheck; - final TSelector valueSelector; + final ColumnValueSelector valueSelector; - public NumericFirstBufferAggregator(BaseLongColumnValueSelector timeSelector, TSelector valueSelector) + public NumericFirstBufferAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; + this.needsFoldCheck = needsFoldCheck; } /** @@ -55,13 +57,22 @@ public NumericFirstBufferAggregator(BaseLongColumnValueSelector timeSelector, TS /** * Place the primitive value in the buffer at the position of {@link #VALUE_OFFSET} */ - abstract void putValue(ByteBuffer buf, int position); + abstract void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSector); - void updateTimeWithValue(ByteBuffer buf, int position, long time) + abstract void putValue(ByteBuffer buf, int position, Number value); + + void updateTimeWithValue(ByteBuffer buf, int position, long time, ColumnValueSelector valueSelector) { buf.putLong(position, time); buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); - putValue(buf, position + VALUE_OFFSET); + putValue(buf, position + VALUE_OFFSET, valueSelector); + } + + void updateTimeWithValue(ByteBuffer buf, int position, long time, Number value) + { + buf.putLong(position, time); + buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + putValue(buf, position + VALUE_OFFSET, value); } void updateTimeWithNull(ByteBuffer buf, int position, long time) @@ -89,11 +100,28 @@ public void aggregate(ByteBuffer buf, int position) if (timeSelector.isNull()) { return; } - long time = timeSelector.getLong(); + long firstTime = buf.getLong(position); + if (needsFoldCheck) { + final Object object = valueSelector.getObject(); + if (object instanceof SerializablePair) { + final SerializablePair inPair = (SerializablePair) object; + if (inPair != null && inPair.lhs < firstTime) { + if (inPair.rhs == null) { + updateTimeWithNull(buf, position, inPair.lhs); + } else { + updateTimeWithValue(buf, position, inPair.lhs, inPair.rhs); + } + } + return; + } + } + + long time = timeSelector.getLong(); + if (time < firstTime) { if (useDefault || !valueSelector.isNull()) { - updateTimeWithValue(buf, position, time); + updateTimeWithValue(buf, position, time, valueSelector); } else { updateTimeWithNull(buf, position, time); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index f7624f4541b39..ed880f50f6100 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -133,7 +133,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(timeColumn), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } @@ -149,7 +149,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(timeColumn), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 6b93be7d70806..0298867bc77b2 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -42,7 +42,8 @@ public class StringFirstLastUtils */ public static boolean selectorNeedsFoldCheck( final BaseObjectColumnValueSelector valueSelector, - @Nullable final ColumnCapabilities valueSelectorCapabilities + @Nullable final ColumnCapabilities valueSelectorCapabilities, + Class pairClass ) { if (valueSelectorCapabilities != null && !valueSelectorCapabilities.is(ValueType.COMPLEX)) { @@ -57,8 +58,8 @@ public static boolean selectorNeedsFoldCheck( // Check if the selector class could possibly be a SerializablePairLongString (either a superclass or subclass). final Class clazz = valueSelector.classOfObject(); - return clazz.isAssignableFrom(SerializablePairLongString.class) - || SerializablePairLongString.class.isAssignableFrom(clazz); + return clazz.isAssignableFrom(pairClass) + || pairClass.isAssignableFrom(clazz); } /** diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java index 3f6a1506bad65..e5eca666986d3 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java @@ -19,30 +19,36 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class DoubleLastAggregator extends NumericLastAggregator +public class DoubleLastAggregator extends NumericLastAggregator { double lastValue; - public DoubleLastAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector) + public DoubleLastAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); lastValue = 0; } @Override - void setCurrentValue() + void setLastValue(ColumnValueSelector valueSelector) { lastValue = valueSelector.getDouble(); } + @Override + void setLastValue(Number lastValue) + { + this.lastValue = lastValue.doubleValue(); + } + @Override public Object get() { - return new SerializablePair<>(lastTime, rhsNull ? null : lastValue); + return new SerializablePairLongDouble(lastTime, rhsNull ? null : lastValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 5e3fa6667928d..de6e707310f75 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -21,21 +21,22 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -56,11 +57,14 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("lastDouble") public class DoubleLastAggregatorFactory extends AggregatorFactory { + public static final ColumnType TYPE = ColumnType.ofComplex(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME); private static final Aggregator NIL_AGGREGATOR = new DoubleLastAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -72,7 +76,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleLastBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -105,13 +110,18 @@ public DoubleLastAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; } else { return new DoubleLastAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) ); } } @@ -142,13 +152,18 @@ public VectorAggregator factorizeVector( @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; } else { return new DoubleLastBufferAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) ); } } @@ -181,74 +196,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("DoubleLastAggregatorFactory is not supported during ingestion for rollup"); + return new GenericLastAggregateCombiner(SerializablePairLongDouble.class); } @Override public AggregatorFactory getCombiningFactory() { - return new DoubleLastAggregatorFactory(name, name, timeColumn) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleLastAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs >= lastTime) { - lastTime = pair.lhs; - if (pair.rhs != null) { - lastValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleLastBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putDouble(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long lastTime = buf.getLong(position); - if (pair.lhs >= lastTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new DoubleLastAggregatorFactory(name, name, timeColumn); } @Override @@ -262,16 +216,16 @@ public Object deserialize(Object object) { Map map = (Map) object; if (map.get("rhs") == null) { - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null); + return new SerializablePairLongDouble(((Number) map.get("lhs")).longValue(), null); } - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); + return new SerializablePairLongDouble(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); } @Override @Nullable public Object finalizeComputation(@Nullable Object object) { - return object == null ? null : ((SerializablePair) object).rhs; + return object == null ? null : ((SerializablePairLongDouble) object).rhs; } @Override @@ -312,7 +266,7 @@ public byte[] getCacheKey() public ColumnType getIntermediateType() { // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return storeDoubleAsFloat ? ColumnType.FLOAT : ColumnType.DOUBLE; + return TYPE; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java index 8acddce53a831..d605180951c49 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java @@ -19,20 +19,21 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class DoubleLastBufferAggregator extends NumericLastBufferAggregator +public class DoubleLastBufferAggregator extends NumericLastBufferAggregator { public DoubleLastBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseDoubleColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +43,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putDouble(position, valueSelector.getDouble()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putDouble(position, value.doubleValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); + return new SerializablePairLongDouble(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java index 1381ccb18b7a2..c55fbb997baa3 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java @@ -19,30 +19,37 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class FloatLastAggregator extends NumericLastAggregator +public class FloatLastAggregator extends NumericLastAggregator { float lastValue; - public FloatLastAggregator(BaseLongColumnValueSelector timeSelector, BaseFloatColumnValueSelector valueSelector) + public FloatLastAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); lastValue = 0; } @Override - void setCurrentValue() + void setLastValue(ColumnValueSelector valueSelector) { lastValue = valueSelector.getFloat(); } + @Override + void setLastValue(Number lastValue) + { + this.lastValue = lastValue.floatValue(); + } + + @Override public Object get() { - return new SerializablePair<>(lastTime, rhsNull ? null : lastValue); + return new SerializablePairLongFloat(lastTime, rhsNull ? null : lastValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java index ff23c3d96dc44..a42491013dbb8 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -21,21 +21,22 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; +import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -56,11 +57,14 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("lastFloat") public class FloatLastAggregatorFactory extends AggregatorFactory { + public static final ColumnType TYPE = ColumnType.ofComplex(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME); private static final Aggregator NIL_AGGREGATOR = new FloatLastAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -72,7 +76,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new FloatLastBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -103,13 +108,18 @@ public FloatLastAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; } else { return new FloatLastAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) ); } } @@ -117,13 +127,18 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; } else { return new FloatLastBufferAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) ); } } @@ -179,72 +194,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("FloatLastAggregatorFactory is not supported during ingestion for rollup"); + return new GenericLastAggregateCombiner(SerializablePairLongFloat.class); } @Override public AggregatorFactory getCombiningFactory() { - return new FloatLastAggregatorFactory(name, name, timeColumn) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatLastAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs >= lastTime) { - lastTime = pair.lhs; - if (pair.rhs != null) { - lastValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatLastBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putFloat(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long lastTime = buf.getLong(position); - if (pair.lhs >= lastTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new FloatLastAggregatorFactory(name, name, timeColumn); } @Override @@ -258,16 +214,16 @@ public Object deserialize(Object object) { Map map = (Map) object; if (map.get("rhs") == null) { - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null); + return new SerializablePairLongFloat(((Number) map.get("lhs")).longValue(), null); } - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).floatValue()); + return new SerializablePairLongFloat(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).floatValue()); } @Override @Nullable public Object finalizeComputation(@Nullable Object object) { - return object == null ? null : ((SerializablePair) object).rhs; + return object == null ? null : ((SerializablePairLongFloat) object).rhs; } @Override @@ -309,7 +265,7 @@ public byte[] getCacheKey() public ColumnType getIntermediateType() { // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ColumnType.FLOAT; + return TYPE; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java index 95ad6fe5c5ee5..68affc9cba90e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java @@ -19,20 +19,22 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class FloatLastBufferAggregator extends NumericLastBufferAggregator +public class FloatLastBufferAggregator extends NumericLastBufferAggregator { public FloatLastBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseFloatColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck + ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +44,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putFloat(position, valueSelector.getFloat()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putFloat(position, value.floatValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); + return new SerializablePairLongFloat(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/GenericLastAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/GenericLastAggregateCombiner.java new file mode 100644 index 0000000000000..7ad091199df4c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/GenericLastAggregateCombiner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.last; + +import com.google.common.primitives.Longs; +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class GenericLastAggregateCombiner> extends ObjectAggregateCombiner +{ + private T lastValue; + private final Class pairClass; + + public GenericLastAggregateCombiner(Class pairClass) + { + this.pairClass = pairClass; + } + + @Override + public void reset(ColumnValueSelector selector) + { + lastValue = (T) selector.getObject(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + T newValue = (T) selector.getObject(); + if (Longs.compare(lastValue.lhs, newValue.lhs) <= 0) { + lastValue = newValue; + } + } + + @Nullable + @Override + public T getObject() + { + return lastValue; + } + + @Override + public Class classOfObject() + { + return pairClass; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java index 59a159d2d8754..b07c3784aaeb0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java @@ -19,29 +19,36 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class LongLastAggregator extends NumericLastAggregator +public class LongLastAggregator extends NumericLastAggregator { long lastValue; - public LongLastAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongLastAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); lastValue = 0; } @Override - void setCurrentValue() + void setLastValue(ColumnValueSelector valueSelector) { lastValue = valueSelector.getLong(); } + @Override + void setLastValue(Number lastValue) + { + this.lastValue = lastValue.longValue(); + } + @Override public Object get() { - return new SerializablePair<>(lastTime, rhsNull ? null : lastValue); + return new SerializablePairLongLong(lastTime, rhsNull ? null : lastValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java index a5304fe10927a..51063bdb5d94a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -21,20 +21,21 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongLong; +import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -55,11 +56,15 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("longLast") public class LongLastAggregatorFactory extends AggregatorFactory { + public static final ColumnType TYPE = ColumnType.ofComplex(SerializablePairLongLongComplexMetricSerde.TYPE_NAME); + private static final Aggregator NIL_AGGREGATOR = new LongLastAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -71,7 +76,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new LongLastBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -102,13 +108,18 @@ public LongLastAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; } else { return new LongLastAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) ); } } @@ -116,13 +127,18 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; } else { return new LongLastBufferAggregator( metricFactory.makeColumnValueSelector(timeColumn), - valueSelector + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) ); } } @@ -177,72 +193,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("LongLastAggregatorFactory is not supported during ingestion for rollup"); + return new GenericLastAggregateCombiner(SerializablePairLongLong.class); } @Override public AggregatorFactory getCombiningFactory() { - return new LongLastAggregatorFactory(name, name, timeColumn) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongLastAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs >= lastTime) { - lastTime = pair.lhs; - if (pair.rhs != null) { - lastValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongLastBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putLong(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long lastTime = buf.getLong(position); - if (pair.lhs >= lastTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new LongLastAggregatorFactory(name, name, timeColumn); } @Override @@ -256,16 +213,16 @@ public Object deserialize(Object object) { Map map = (Map) object; if (map.get("rhs") == null) { - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null); + return new SerializablePairLongLong(((Number) map.get("lhs")).longValue(), null); } - return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); + return new SerializablePairLongLong(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); } @Override @Nullable public Object finalizeComputation(@Nullable Object object) { - return object == null ? null : ((SerializablePair) object).rhs; + return object == null ? null : ((SerializablePairLongLong) object).rhs; } @Override @@ -306,7 +263,7 @@ public byte[] getCacheKey() public ColumnType getIntermediateType() { // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ColumnType.LONG; + return TYPE; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java index 981ba3e2f6658..a4a318329b774 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java @@ -19,16 +19,17 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class LongLastBufferAggregator extends NumericLastBufferAggregator +public class LongLastBufferAggregator extends NumericLastBufferAggregator { - public LongLastBufferAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongLastBufferAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -38,16 +39,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putLong(position, valueSelector.getLong()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putLong(position, value.longValue()); + } + @Override public Object get(ByteBuffer buf, int position) { boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); + return new SerializablePairLongLong(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java index bc1794f1338e6..8d36d534a8723 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java @@ -19,47 +19,79 @@ package org.apache.druid.query.aggregation.last; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; /** * Base type for on heap 'last' aggregator for primitive numeric column selectors.. * * This could probably share a base class with {@link org.apache.druid.query.aggregation.first.NumericFirstAggregator} */ -public abstract class NumericLastAggregator implements Aggregator +public abstract class NumericLastAggregator implements Aggregator { private final boolean useDefault = NullHandling.replaceWithDefault(); + private final BaseLongColumnValueSelector timeSelector; + final ColumnValueSelector valueSelector; + final boolean needsFoldCheck; - final TSelector valueSelector; long lastTime; boolean rhsNull; - public NumericLastAggregator(BaseLongColumnValueSelector timeSelector, TSelector valueSelector) + public NumericLastAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; + this.needsFoldCheck = needsFoldCheck; lastTime = Long.MIN_VALUE; rhsNull = !useDefault; } + /** + * Store the current primitive typed 'last' value + */ + abstract void setLastValue(ColumnValueSelector valueSelector); + + abstract void setLastValue(Number lastValue); + @Override public void aggregate() { if (timeSelector.isNull()) { return; } + + if (needsFoldCheck) { + final Object object = valueSelector.getObject(); + if (object instanceof SerializablePair) { + final SerializablePair inPair = (SerializablePair) object; + + if (inPair != null && inPair.lhs >= lastTime) { + lastTime = inPair.lhs; + + if (inPair.rhs == null) { + rhsNull = true; + } else { + rhsNull = false; + setLastValue(inPair.rhs); + + } + } + return; + } + } long time = timeSelector.getLong(); if (time >= lastTime) { lastTime = time; if (useDefault || !valueSelector.isNull()) { - setCurrentValue(); + setLastValue(valueSelector); rhsNull = false; } else { + setLastValue(0); rhsNull = true; } } @@ -70,9 +102,4 @@ public void close() { // nothing to close } - - /** - * Store the current primitive typed 'last' value - */ - abstract void setCurrentValue(); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java index 4b741e00dd262..e6f4e6a893b40 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java @@ -19,11 +19,12 @@ package org.apache.druid.query.aggregation.last; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; @@ -33,8 +34,7 @@ * This could probably share a base type with * {@link org.apache.druid.query.aggregation.first.NumericFirstBufferAggregator} ... */ -public abstract class NumericLastBufferAggregator - implements BufferAggregator +public abstract class NumericLastBufferAggregator implements BufferAggregator { static final int NULL_OFFSET = Long.BYTES; static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES; @@ -42,12 +42,14 @@ public abstract class NumericLastBufferAggregator inPair = (SerializablePair) object; + + if (inPair != null && inPair.lhs >= lastTime) { + if (inPair.rhs == null) { + updateTimeWithNull(buf, position, inPair.lhs); + } else { + updateTimeWithValue(buf, position, inPair.lhs, inPair.rhs); + } + } + return; + } + } + + long time = timeSelector.getLong(); + if (time >= lastTime) { if (useDefault || !valueSelector.isNull()) { - updateTimeWithValue(buf, position, time); + updateTimeWithValue(buf, position, time, valueSelector); } else { updateTimeWithNull(buf, position, time); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index e1b39edc4add6..d53d03892903d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -126,7 +126,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(timeColumn), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } @@ -142,7 +142,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(timeColumn), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java index 87d0e3dfdd8e3..eb2d99854ae17 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java @@ -210,21 +210,21 @@ public void testResultArraySignature() .add("longSum", ColumnType.LONG) .add("longMin", ColumnType.LONG) .add("longMax", ColumnType.LONG) - .add("longFirst", ColumnType.LONG) - .add("longLast", ColumnType.LONG) + .add("longFirst", null) + .add("longLast", null) .add("longAny", ColumnType.LONG) .add("doubleSum", ColumnType.DOUBLE) .add("doubleMin", ColumnType.DOUBLE) .add("doubleMax", ColumnType.DOUBLE) - .add("doubleFirst", ColumnType.DOUBLE) - .add("doubleLast", ColumnType.DOUBLE) + .add("doubleFirst", null) + .add("doubleLast", null) .add("doubleAny", ColumnType.DOUBLE) .add("doubleMean", null) .add("floatSum", ColumnType.FLOAT) .add("floatMin", ColumnType.FLOAT) .add("floatMax", ColumnType.FLOAT) - .add("floatFirst", ColumnType.FLOAT) - .add("floatLast", ColumnType.FLOAT) + .add("floatFirst", null) + .add("floatLast", null) .add("floatAny", ColumnType.FLOAT) .add("stringFirst", null) .add("stringLast", null) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java index ebe628ab1215c..7109a0a47961e 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java @@ -19,17 +19,20 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.query.aggregation.TestDoubleColumnSelectorImpl; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -52,11 +55,11 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest private double[] doubleValues = {1.1d, 2.7d, 3.5d, 1.3d}; private long[] times = {12, 10, 5344, 7899999}; private long[] customTimes = {2, 1, 3, 4}; - private SerializablePair[] pairs = { - new SerializablePair<>(1467225096L, 134.3d), - new SerializablePair<>(23163L, 1232.212d), - new SerializablePair<>(742L, 18d), - new SerializablePair<>(111111L, 233.5232d) + private SerializablePairLongDouble[] pairs = { + new SerializablePairLongDouble(1467225096L, 134.3d), + new SerializablePairLongDouble(23163L, 1232.212d), + new SerializablePairLongDouble(742L, 18d), + new SerializablePairLongDouble(111111L, 233.5232d) }; @Before @@ -73,6 +76,10 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ColumnType.DOUBLE)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); + EasyMock.replay(colSelectorFactory); } @@ -158,16 +165,16 @@ public void testDoubleFirstBufferAggregatorWithTimeColumn() @Test public void testCombine() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); - SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4); + SerializablePairLongDouble pair1 = new SerializablePairLongDouble(1467225000L, 3.621); + SerializablePairLongDouble pair2 = new SerializablePairLongDouble(1467240000L, 785.4); Assert.assertEquals(pair1, doubleFirstAggFactory.combine(pair1, pair2)); } @Test public void testComparator() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); - SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4); + SerializablePairLongDouble pair1 = new SerializablePairLongDouble(1467225000L, 3.621); + SerializablePairLongDouble pair2 = new SerializablePairLongDouble(1467240000L, 785.4); Comparator comparator = doubleFirstAggFactory.getComparator(); Assert.assertEquals(-1, comparator.compare(pair1, pair2)); Assert.assertEquals(0, comparator.compare(pair1, pair1)); @@ -178,8 +185,8 @@ public void testComparator() @Test public void testComparatorWithNulls() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); - SerializablePair pair2 = new SerializablePair<>(1467240000L, null); + SerializablePairLongDouble pair1 = new SerializablePairLongDouble(1467225000L, 3.621); + SerializablePairLongDouble pair2 = new SerializablePairLongDouble(1467240000L, null); Comparator comparator = doubleFirstAggFactory.getComparator(); Assert.assertEquals(1, comparator.compare(pair1, pair2)); Assert.assertEquals(0, comparator.compare(pair1, pair1)); @@ -241,6 +248,28 @@ public void testSerde() throws Exception Assert.assertArrayEquals(doubleFirstAggFactory.getCacheKey(), deserialized.getCacheKey()); } + @Test + public void testDoubleFirstAggregateCombiner() + { + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); + AggregateCombiner doubleFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + doubleFirstAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(pairs[0], doubleFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + doubleFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[1], doubleFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + doubleFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[2], doubleFirstAggregateCombiner.getObject()); + + doubleFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(pairs[2], doubleFirstAggregateCombiner.getObject()); + } + + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java index bd9bc11d2f1e7..0df142677437c 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java @@ -19,17 +19,20 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.query.aggregation.TestFloatColumnSelector; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -52,11 +55,11 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest private float[] floats = {1.1f, 2.7f, 3.5f, 1.3f}; private long[] times = {12, 10, 5344, 7899999}; private long[] customTimes = {2, 1, 3, 4}; - private SerializablePair[] pairs = { - new SerializablePair<>(1467225096L, 134.3f), - new SerializablePair<>(23163L, 1232.212f), - new SerializablePair<>(742L, 18f), - new SerializablePair<>(111111L, 233.5232f) + private SerializablePairLongFloat[] pairs = { + new SerializablePairLongFloat(1467225096L, 134.3f), + new SerializablePairLongFloat(23163L, 1232.212f), + new SerializablePairLongFloat(742L, 18f), + new SerializablePairLongFloat(111111L, 233.5232f) }; @Before @@ -73,6 +76,10 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector).atLeastOnce(); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector).atLeastOnce(); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ColumnType.FLOAT)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); + EasyMock.replay(colSelectorFactory); } @@ -158,16 +165,16 @@ public void testFloatFirstBufferAggregatorWithTimeColumn() @Test public void testCombine() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f); - SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4f); + SerializablePairLongFloat pair1 = new SerializablePairLongFloat(1467225000L, 3.621f); + SerializablePairLongFloat pair2 = new SerializablePairLongFloat(1467240000L, 785.4f); Assert.assertEquals(pair1, floatFirstAggregatorFactory.combine(pair1, pair2)); } @Test public void testComparatorWithNulls() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f); - SerializablePair pair2 = new SerializablePair<>(1467240000L, null); + SerializablePairLongFloat pair1 = new SerializablePairLongFloat(1467225000L, 3.621f); + SerializablePairLongFloat pair2 = new SerializablePairLongFloat(1467240000L, null); Comparator comparator = floatFirstAggregatorFactory.getComparator(); Assert.assertEquals(1, comparator.compare(pair1, pair2)); Assert.assertEquals(0, comparator.compare(pair1, pair1)); @@ -228,6 +235,28 @@ public void testSerde() throws Exception Assert.assertArrayEquals(floatFirstAggregatorFactory.getCacheKey(), deserialized.getCacheKey()); } + @Test + public void testFloatFirstAggregateCombiner() + { + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); + AggregateCombiner floatFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + floatFirstAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(pairs[0], floatFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + floatFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[1], floatFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + floatFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[2], floatFirstAggregateCombiner.getObject()); + + floatFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(pairs[2], floatFirstAggregateCombiner.getObject()); + } + + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java index d73eb5511e947..104dbcfa60dde 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java @@ -19,16 +19,19 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -51,11 +54,11 @@ public class LongFirstAggregationTest extends InitializedNullHandlingTest private long[] longValues = {185, -216, -128751132, Long.MIN_VALUE}; private long[] times = {1123126751, 1784247991, 1854329816, 1000000000}; private long[] customTimes = {2, 1, 3, 4}; - private SerializablePair[] pairs = { - new SerializablePair<>(1L, 113267L), - new SerializablePair<>(1L, 5437384L), - new SerializablePair<>(6L, 34583458L), - new SerializablePair<>(88L, 34583452L) + private SerializablePairLongLong[] pairs = { + new SerializablePairLongLong(1L, 113267L), + new SerializablePairLongLong(1L, 5437384L), + new SerializablePairLongLong(6L, 34583458L), + new SerializablePairLongLong(88L, 34583452L) }; @Before @@ -72,6 +75,9 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ColumnType.LONG)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } @@ -157,16 +163,16 @@ public void testLongFirstBufferAggregatorWithTimeColumn() @Test public void testCombine() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L); - SerializablePair pair2 = new SerializablePair<>(1467240000L, 752713L); + SerializablePairLongLong pair1 = new SerializablePairLongLong(1467225000L, 1263L); + SerializablePairLongLong pair2 = new SerializablePairLongLong(1467240000L, 752713L); Assert.assertEquals(pair1, longFirstAggFactory.combine(pair1, pair2)); } @Test public void testComparatorWithNulls() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L); - SerializablePair pair2 = new SerializablePair<>(1467240000L, null); + SerializablePairLongLong pair1 = new SerializablePairLongLong(1467225000L, 1263L); + SerializablePairLongLong pair2 = new SerializablePairLongLong(1467240000L, null); Comparator comparator = longFirstAggFactory.getComparator(); Assert.assertEquals(1, comparator.compare(pair1, pair2)); Assert.assertEquals(0, comparator.compare(pair1, pair1)); @@ -227,6 +233,27 @@ public void testSerde() throws Exception Assert.assertArrayEquals(longFirstAggFactory.getCacheKey(), deserialized.getCacheKey()); } + @Test + public void testLongFirstAggregeCombiner() + { + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); + AggregateCombiner longFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + longFirstAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(pairs[0], longFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + longFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[0], longFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + longFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[0], longFirstAggregateCombiner.getObject()); + + longFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(pairs[2], longFirstAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java index 3648ddc5fea68..bbcf9be37a2be 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java @@ -19,17 +19,20 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.query.aggregation.TestDoubleColumnSelectorImpl; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -52,11 +55,11 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest private double[] doubles = {1.1897d, 0.001d, 86.23d, 166.228d}; private long[] times = {8224, 6879, 2436, 7888}; private long[] customTimes = {1, 4, 3, 2}; - private SerializablePair[] pairs = { - new SerializablePair<>(52782L, 134.3d), - new SerializablePair<>(65492L, 1232.212d), - new SerializablePair<>(69134L, 18.1233d), - new SerializablePair<>(11111L, 233.5232d) + private SerializablePairLongDouble[] pairs = { + new SerializablePairLongDouble(52782L, 134.3d), + new SerializablePairLongDouble(65492L, 1232.212d), + new SerializablePairLongDouble(69134L, 18.1233d), + new SerializablePairLongDouble(11111L, 233.5232d) }; @Before @@ -73,6 +76,9 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ColumnType.DOUBLE)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } @@ -159,16 +165,16 @@ public void testDoubleLastBufferAggregatorWithTimeColumn() @Test public void testCombine() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); - SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4); + SerializablePairLongDouble pair1 = new SerializablePairLongDouble(1467225000L, 3.621); + SerializablePairLongDouble pair2 = new SerializablePairLongDouble(1467240000L, 785.4); Assert.assertEquals(pair2, doubleLastAggFactory.combine(pair1, pair2)); } @Test public void testComparatorWithNulls() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); - SerializablePair pair2 = new SerializablePair<>(1467240000L, null); + SerializablePairLongDouble pair1 = new SerializablePairLongDouble(1467225000L, 3.621); + SerializablePairLongDouble pair2 = new SerializablePairLongDouble(1467240000L, null); Comparator comparator = doubleLastAggFactory.getComparator(); Assert.assertEquals(1, comparator.compare(pair1, pair2)); Assert.assertEquals(0, comparator.compare(pair1, pair1)); @@ -229,6 +235,32 @@ public void testSerde() throws Exception Assert.assertArrayEquals(doubleLastAggFactory.getCacheKey(), deserialized.getCacheKey()); } + @Test + public void testDoubleLastAggregateCombiner() + { + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); + AggregateCombiner doubleLastAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + doubleLastAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(pairs[0], doubleLastAggregateCombiner.getObject()); + + columnSelector.increment(); + doubleLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[1], doubleLastAggregateCombiner.getObject()); + + columnSelector.increment(); + doubleLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[2], doubleLastAggregateCombiner.getObject()); + + columnSelector.increment(); + doubleLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[2], doubleLastAggregateCombiner.getObject()); + + doubleLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(pairs[3], doubleLastAggregateCombiner.getObject()); + } + + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java index c1e148abe52cf..7030f6671e1bc 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java @@ -19,17 +19,20 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.query.aggregation.TestFloatColumnSelector; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -52,11 +55,11 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest private float[] floats = {1.1897f, 0.001f, 86.23f, 166.228f}; private long[] times = {8224, 6879, 2436, 7888}; private long[] customTimes = {1, 4, 3, 2}; - private SerializablePair[] pairs = { - new SerializablePair<>(52782L, 134.3f), - new SerializablePair<>(65492L, 1232.212f), - new SerializablePair<>(69134L, 18.1233f), - new SerializablePair<>(11111L, 233.5232f) + private SerializablePairLongFloat[] pairs = { + new SerializablePairLongFloat(52782L, 134.3f), + new SerializablePairLongFloat(65492L, 1232.212f), + new SerializablePairLongFloat(69134L, 18.1233f), + new SerializablePairLongFloat(11111L, 233.5232f) }; @Before @@ -73,6 +76,9 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ColumnType.FLOAT)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } @@ -159,16 +165,16 @@ public void testFloatLastBufferAggregatorWithTimeColumn() @Test public void testCombine() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f); - SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4f); + SerializablePairLongFloat pair1 = new SerializablePairLongFloat(1467225000L, 3.621f); + SerializablePairLongFloat pair2 = new SerializablePairLongFloat(1467240000L, 785.4f); Assert.assertEquals(pair2, floatLastAggregatorFactory.combine(pair1, pair2)); } @Test public void testComparatorWithNulls() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f); - SerializablePair pair2 = new SerializablePair<>(1467240000L, null); + SerializablePairLongFloat pair1 = new SerializablePairLongFloat(1467225000L, 3.621f); + SerializablePairLongFloat pair2 = new SerializablePairLongFloat(1467240000L, null); Comparator comparator = floatLastAggregatorFactory.getComparator(); Assert.assertEquals(1, comparator.compare(pair1, pair2)); Assert.assertEquals(0, comparator.compare(pair1, pair1)); @@ -229,6 +235,31 @@ public void testSerde() throws Exception Assert.assertArrayEquals(floatLastAggregatorFactory.getCacheKey(), deserialized.getCacheKey()); } + @Test + public void testFloatLastAggregateCombiner() + { + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); + AggregateCombiner floatLastAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + floatLastAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(pairs[0], floatLastAggregateCombiner.getObject()); + + columnSelector.increment(); + floatLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[1], floatLastAggregateCombiner.getObject()); + + columnSelector.increment(); + floatLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[2], floatLastAggregateCombiner.getObject()); + + columnSelector.increment(); + floatLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[2], floatLastAggregateCombiner.getObject()); + + floatLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(pairs[3], floatLastAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java index cacd905ee0547..111f868842514 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java @@ -19,16 +19,19 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -51,11 +54,12 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest private long[] longValues = {23216, 8635, 1547123, Long.MAX_VALUE}; private long[] times = {1467935723, 1467225653, 1601848932, 72515}; private long[] customTimes = {1, 4, 3, 2}; - private SerializablePair[] pairs = { - new SerializablePair<>(12531L, 113267L), - new SerializablePair<>(123L, 5437384L), - new SerializablePair<>(125755L, 34583458L), - new SerializablePair<>(124L, 34283452L) + private SerializablePairLongLong[] pairs = { + new SerializablePairLongLong(12531L, 113267L), + new SerializablePairLongLong(12534L, null), + new SerializablePairLongLong(123L, 5437384L), + new SerializablePairLongLong(125755L, 34583458L), + new SerializablePairLongLong(124L, 34283452L) }; @Before @@ -72,6 +76,9 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ColumnType.LONG)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } @@ -158,16 +165,16 @@ public void testLongLastBufferAggregatorWithTimeColumn() @Test public void testCombine() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 64432L); - SerializablePair pair2 = new SerializablePair<>(1467240000L, 99999L); + SerializablePairLongLong pair1 = new SerializablePairLongLong(1467225000L, 64432L); + SerializablePairLongLong pair2 = new SerializablePairLongLong(1467240000L, 99999L); Assert.assertEquals(pair2, longLastAggFactory.combine(pair1, pair2)); } @Test public void testComparatorWithNulls() { - SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L); - SerializablePair pair2 = new SerializablePair<>(1467240000L, null); + SerializablePairLongLong pair1 = new SerializablePairLongLong(1467225000L, 1263L); + SerializablePairLongLong pair2 = new SerializablePairLongLong(1467240000L, null); Comparator comparator = longLastAggFactory.getComparator(); Assert.assertEquals(1, comparator.compare(pair1, pair2)); Assert.assertEquals(0, comparator.compare(pair1, pair1)); @@ -186,7 +193,7 @@ public void testLongLastCombiningAggregator() aggregate(agg); Pair result = (Pair) agg.get(); - Pair expected = (Pair) pairs[2]; + Pair expected = (Pair) pairs[3]; Assert.assertEquals(expected.lhs, result.lhs); Assert.assertEquals(expected.rhs, result.rhs); @@ -209,7 +216,7 @@ public void testLongLastCombiningBufferAggregator() aggregate(agg, buffer, 0); Pair result = (Pair) agg.get(buffer, 0); - Pair expected = (Pair) pairs[2]; + Pair expected = (Pair) pairs[3]; Assert.assertEquals(expected.lhs, result.lhs); Assert.assertEquals(expected.rhs, result.rhs); @@ -228,6 +235,27 @@ public void testSerde() throws Exception Assert.assertArrayEquals(longLastAggFactory.getCacheKey(), deserialized.getCacheKey()); } + @Test + public void testLongLastAggregateCombiner() + { + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs); + AggregateCombiner longLastAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + longLastAggregateCombiner.reset(columnSelector); + + Assert.assertEquals(pairs[0], longLastAggregateCombiner.getObject()); + + columnSelector.increment(); + longLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[1], longLastAggregateCombiner.getObject()); + + columnSelector.increment(); + longLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(pairs[1], longLastAggregateCombiner.getObject()); + + longLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(pairs[2], longLastAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 2a700c08bd48a..b5a92bf7e67e7 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -39,6 +39,9 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; @@ -951,9 +954,11 @@ private SerializablePair getIntermediateComplexValue(final ValueType valueType, { switch (valueType) { case LONG: + return new SerializablePairLongLong(123L, (long) dimValue); case DOUBLE: + return new SerializablePairLongDouble(123L, (double) dimValue); case FLOAT: - return new SerializablePair<>(123L, dimValue); + return new SerializablePairLongFloat(123L, (float) dimValue); case STRING: return new SerializablePairLongString(123L, (String) dimValue); default: diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index f5b5111ca55b7..89f7d60453e62 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -43,6 +43,9 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregator; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -388,9 +391,11 @@ private SerializablePair getIntermediateComplexValue(final ValueType valueType, { switch (valueType) { case LONG: + return new SerializablePairLongLong(123L, (long) dimValue); case DOUBLE: + return new SerializablePairLongDouble(123L, (double) dimValue); case FLOAT: - return new SerializablePair<>(123L, dimValue); + return new SerializablePairLongFloat(123L, (float) dimValue); case STRING: return new SerializablePairLongString(123L, (String) dimValue); default: diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java index fd99ae96e9448..825edaeccfeee 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java @@ -22,7 +22,13 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.segment.data.IncrementalIndexTest; import org.apache.druid.segment.incremental.IncrementalIndex; @@ -61,7 +67,7 @@ public void setUp() indexSpec = IndexSpec.DEFAULT; } - private void testStringFirstLastRollup( + private void testFirstLastRollup( AggregatorFactory[] aggregatorFactories ) throws Exception { @@ -71,6 +77,9 @@ private void testStringFirstLastRollup( { put("d", "d1"); put("m", "m1"); + put("l", 123L); + put("f", 123.0F); + put("dl", 123.5D); } }, new HashMap() @@ -78,6 +87,9 @@ private void testStringFirstLastRollup( { put("d", "d1"); put("m", "m2"); + put("l", 124L); + put("f", 124.0F); + put("dl", 124.5D); } } ); @@ -102,20 +114,26 @@ private void testStringFirstLastRollup( } @Test - public void testStringFirstRollup() throws Exception + public void testFirstRollup() throws Exception { AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ - new StringFirstAggregatorFactory("m", "m", null, 1024) + new StringFirstAggregatorFactory("m", "m", null, 1024), + new LongFirstAggregatorFactory("l", "l", null), + new FloatFirstAggregatorFactory("f", "f", null), + new DoubleFirstAggregatorFactory("dl", "dl", null), }; - testStringFirstLastRollup(aggregatorFactories); + testFirstLastRollup(aggregatorFactories); } @Test - public void testStringLastRollup() throws Exception + public void testLastRollup() throws Exception { AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ - new StringLastAggregatorFactory("m", "m", null, 1024) + new StringLastAggregatorFactory("m", "m", null, 1024), + new LongLastAggregatorFactory("l", "l", null), + new FloatLastAggregatorFactory("f", "f", null), + new DoubleLastAggregatorFactory("dl", "dl", null), }; - testStringFirstLastRollup(aggregatorFactories); + testFirstLastRollup(aggregatorFactories); } }