From b2a88da200bb435e5e231d50c97177fcb9b8d3f6 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 25 Jul 2024 08:45:29 -0700 Subject: [PATCH] Attempt to coerce COMPLEX to number in numeric aggregators. (#16564) * Coerce COMPLEX to number in numeric aggregators. PR #15371 eliminated ObjectColumnSelector's built-in implementations of numeric methods, which had been marked deprecated. However, some complex types, like SpectatorHistogram, can be successfully coerced to number. The documentation for spectator histograms encourages taking advantage of this by aggregating complex columns with doubleSum and longSum. Currently, this doesn't work properly for IncrementalIndex, where the behavior relied on those deprecated ObjectColumnSelector methods. This patch fixes the behavior by making two changes: 1) SimpleXYZAggregatorFactory (XYZ = type; base class for simple numeric aggregators; all of these extend NullableNumericAggregatorFactory) use getObject for STRING and COMPLEX. Previously, getObject was only used for STRING. 2) NullableNumericAggregatorFactory (base class for simple numeric aggregators) has a new protected method "useGetObject". This allows the base class to correctly check for null (using getObject or isNull). The patch also adds a test for SpectatorHistogram + doubleSum + IncrementalIndex. * Fix tests. * Remove the special ColumnValueSelector. * Add test. --- .../spectator-histogram/pom.xml | 10 +++ ...tatorHistogramIndexBasedComplexColumn.java | 69 +------------------ .../SpectatorHistogramAggregatorTest.java | 67 ++++++++++++++++++ ...rHistogramIndexBasedComplexColumnTest.java | 42 +++++++++++ .../query/aggregation/AggregatorUtil.java | 26 +++++++ .../aggregation/DelegatingAggregator.java | 2 +- .../DelegatingBufferAggregator.java | 2 +- .../NullableNumericAggregatorFactory.java | 46 +++++++++++-- ... ObjectColumnDoubleAggregatorWrapper.java} | 6 +- ...tColumnDoubleBufferAggregatorWrapper.java} | 6 +- ...> ObjectColumnFloatAggregatorWrapper.java} | 6 +- ...ctColumnFloatBufferAggregatorWrapper.java} | 6 +- ...=> ObjectColumnLongAggregatorWrapper.java} | 6 +- ...ectColumnLongBufferAggregatorWrapper.java} | 6 +- .../SimpleDoubleAggregatorFactory.java | 20 ++---- .../SimpleFloatAggregatorFactory.java | 26 +++---- .../SimpleLongAggregatorFactory.java | 26 +++---- .../aggregation/DoubleMaxAggregationTest.java | 2 +- .../aggregation/DoubleMinAggregationTest.java | 2 +- .../aggregation/LongMaxAggregationTest.java | 2 +- .../aggregation/LongMinAggregationTest.java | 2 +- 21 files changed, 236 insertions(+), 144 deletions(-) create mode 100644 extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumnTest.java rename processing/src/main/java/org/apache/druid/query/aggregation/{StringColumnDoubleAggregatorWrapper.java => ObjectColumnDoubleAggregatorWrapper.java} (93%) rename processing/src/main/java/org/apache/druid/query/aggregation/{StringColumnDoubleBufferAggregatorWrapper.java => ObjectColumnDoubleBufferAggregatorWrapper.java} (93%) rename processing/src/main/java/org/apache/druid/query/aggregation/{StringColumnFloatAggregatorWrapper.java => ObjectColumnFloatAggregatorWrapper.java} (93%) rename processing/src/main/java/org/apache/druid/query/aggregation/{StringColumnFloatBufferAggregatorWrapper.java => ObjectColumnFloatBufferAggregatorWrapper.java} (93%) rename processing/src/main/java/org/apache/druid/query/aggregation/{StringColumnLongAggregatorWrapper.java => ObjectColumnLongAggregatorWrapper.java} (93%) rename processing/src/main/java/org/apache/druid/query/aggregation/{StringColumnLongBufferAggregatorWrapper.java => ObjectColumnLongBufferAggregatorWrapper.java} (93%) diff --git a/extensions-contrib/spectator-histogram/pom.xml b/extensions-contrib/spectator-histogram/pom.xml index 476e562bf4fa..fd6fff411aad 100644 --- a/extensions-contrib/spectator-histogram/pom.xml +++ b/extensions-contrib/spectator-histogram/pom.xml @@ -92,6 +92,11 @@ error_prone_annotations provided + + joda-time + joda-time + provided + org.apache.druid druid-sql @@ -137,5 +142,10 @@ test-jar test + + org.easymock + easymock + test + diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumn.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumn.java index 2e54fcf0d45e..8965b595e59f 100644 --- a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumn.java +++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumn.java @@ -19,18 +19,12 @@ package org.apache.druid.spectator.histogram; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.column.ComplexColumn; -import org.apache.druid.segment.data.ReadableOffset; - -import javax.annotation.Nullable; public class SpectatorHistogramIndexBasedComplexColumn implements ComplexColumn { private final SpectatorHistogramIndexed index; private final String typeName; - private static final Number ZERO = 0; public SpectatorHistogramIndexBasedComplexColumn(String typeName, SpectatorHistogramIndexed index) { @@ -59,72 +53,11 @@ public Object getRowValue(int rowNum) @Override public int getLength() { - return index.size(); + return -1; } @Override public void close() { } - - @Override - public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) - { - // Use ColumnValueSelector directly so that we support being queried as a Number using - // longSum or doubleSum aggregators, the NullableNumericBufferAggregator will call isNull. - // This allows us to behave as a Number or SpectatorHistogram object. - // When queried as a Number, we're returning the count of entries in the histogram. - // As such, we can safely return 0 where the histogram is null. - return new ColumnValueSelector() - { - @Override - public boolean isNull() - { - return getObject() == null; - } - - private Number getOrZero() - { - SpectatorHistogram histogram = getObject(); - return histogram != null ? histogram : ZERO; - } - - @Override - public long getLong() - { - return getOrZero().longValue(); - } - - @Override - public float getFloat() - { - return getOrZero().floatValue(); - } - - @Override - public double getDouble() - { - return getOrZero().doubleValue(); - } - - @Nullable - @Override - public SpectatorHistogram getObject() - { - return (SpectatorHistogram) getRowValue(offset.getOffset()); - } - - @Override - public Class classOfObject() - { - return getClazz(); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("column", SpectatorHistogramIndexBasedComplexColumn.this); - } - }; - } } diff --git a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java index 1c30cfc05c36..88e710645d1e 100644 --- a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java +++ b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java @@ -20,8 +20,14 @@ package org.apache.druid.spectator.histogram; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.netflix.spectator.api.histogram.PercentileBuckets; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Druids; @@ -32,6 +38,9 @@ import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.ResultRow; @@ -42,13 +51,17 @@ import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -59,6 +72,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -716,6 +730,59 @@ public void testPercentilePostAggregator() throws Exception } } + @Test + public void testBuildingAndCountingHistogramsIncrementalIndex() throws Exception + { + List dimensions = Collections.singletonList("d"); + int n = 10; + DateTime startOfDay = DateTimes.of("2000-01-01"); + List inputRows = new ArrayList<>(n); + for (int i = 1; i <= n; i++) { + String val = String.valueOf(i * 1.0d); + + inputRows.add(new MapBasedInputRow( + startOfDay.plusMinutes(i), + dimensions, + ImmutableMap.of("x", i, "d", val) + )); + } + + IncrementalIndex index = AggregationTestHelper.createIncrementalIndex( + inputRows.iterator(), + new NoopInputRowParser(null), + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new SpectatorHistogramAggregatorFactory("histogram", "x") + }, + 0, + Granularities.NONE, + 100, + false + ); + + ImmutableList segments = ImmutableList.of( + new IncrementalIndexSegment(index, SegmentId.dummy("test")), + helper.persistIncrementalIndex(index, null) + ); + + GroupByQuery query = new GroupByQuery.Builder() + .setDataSource("test") + .setGranularity(Granularities.HOUR) + .setInterval("1970/2050") + .setAggregatorSpecs( + new DoubleSumAggregatorFactory("doubleSum", "histogram") + ).build(); + + Sequence seq = helper.runQueryOnSegmentsObjs(segments, query); + + List results = seq.toList(); + Assert.assertEquals(1, results.size()); + // Check timestamp + Assert.assertEquals(startOfDay.getMillis(), results.get(0).get(0)); + // Check doubleSum + Assert.assertEquals(n * segments.size(), (Double) results.get(0).get(1), 0.001); + } + private static void assertResultsMatch(List results, int rowNum, String expectedProduct) { ResultRow row = results.get(rowNum); diff --git a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumnTest.java b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumnTest.java new file mode 100644 index 000000000000..643e2e57cc56 --- /dev/null +++ b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexBasedComplexColumnTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spectator.histogram; + +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +public class SpectatorHistogramIndexBasedComplexColumnTest +{ + @Test + public void testComplexColumn() + { + final SpectatorHistogramIndexed mockIndexed = EasyMock.createMock(SpectatorHistogramIndexed.class); + EasyMock.replay(mockIndexed); + + final String typeName = "type"; + final SpectatorHistogramIndexBasedComplexColumn column = + new SpectatorHistogramIndexBasedComplexColumn("type", mockIndexed); + Assert.assertEquals(typeName, column.getTypeName()); + Assert.assertEquals(-1, column.getLength()); + + EasyMock.verify(mockIndexed); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index 2034ba21a5be..c4c9a7875ef0 100755 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -28,6 +28,8 @@ import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprEval; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -35,6 +37,8 @@ import org.apache.druid.segment.FloatColumnSelector; import org.apache.druid.segment.LongColumnSelector; import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.segment.virtual.ExpressionSelectors; @@ -428,4 +432,26 @@ public static Supplier getSimpleAggregatorCacheKeySupplier( .array(); }); } + + /** + * Whether a simple numeric aggregator should use {@link BaseObjectColumnValueSelector#getObject()}, and coerce the + * result to number, rather than using a primitive method like {@link BaseLongColumnValueSelector#getLong()}. + * + * @param fieldName field name, or null if the aggregator is expression-based + * @param columnSelectorFactory column selector factory + */ + public static boolean shouldUseObjectColumnAggregatorWrapper( + @Nullable final String fieldName, + final ColumnSelectorFactory columnSelectorFactory + ) + { + if (fieldName != null) { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + + // STRING can be coerced to a number. COMPLEX types can be subclasses of Number (or subclasses of some type + // that is coercible to a number.) + return Types.is(capabilities, ValueType.STRING) || Types.is(capabilities, ValueType.COMPLEX); + } + return false; + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java index c1b4b4090235..6b0d0fb49696 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java @@ -23,7 +23,7 @@ /** * An Aggregator that delegates everything. It is used by Aggregator wrappers e.g. - * {@link StringColumnDoubleAggregatorWrapper} that modify some behavior of a delegate. + * {@link ObjectColumnDoubleAggregatorWrapper} that modify some behavior of a delegate. */ public abstract class DelegatingAggregator implements Aggregator { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java index 9b1aa8086b2b..87718f331088 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java @@ -26,7 +26,7 @@ /** * A BufferAggregator that delegates everything. It is used by BufferAggregator wrappers e.g. - * {@link StringColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate. + * {@link ObjectColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate. */ public abstract class DelegatingBufferAggregator implements BufferAggregator { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java index c9928f828f7f..d85d91936d78 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java @@ -24,6 +24,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.column.ColumnType; @@ -38,8 +39,9 @@ * values to be aggregated are null values, or if no values are aggregated at all. If any of the values are non-null, * the result will be the aggregated value of the non-null values. * - * This superclass should only be extended by aggregators that read primitive numbers. It implements logic that is - * not valid for non-numeric selector methods such as {@link ColumnValueSelector#getObject()}. + * Aggregators that use {@link ColumnValueSelector#getObject()} must override + * {@link #useGetObject(ColumnSelectorFactory)}. Otherwise, the logic in this class is not correct for + * non-numeric selectors. * * @see BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case */ @@ -51,16 +53,18 @@ public abstract class NullableNumericAggregatorFactory combiner = makeAggregateCombiner(); + return NullHandling.sqlCompatible() ? new NullableNumericAggregateCombiner<>(combiner) : combiner; } @Override @@ -85,6 +89,23 @@ public final int getMaxIntermediateSizeWithNulls() return getMaxIntermediateSize() + (NullHandling.replaceWithDefault() ? 0 : Byte.BYTES); } + /** + * Returns the selector that should be used by {@link NullableNumericAggregator} and + * {@link NullableNumericBufferAggregator} to determine if the current value is null. + */ + private BaseNullableColumnValueSelector makeNullSelector( + final T selector, + final ColumnSelectorFactory columnSelectorFactory + ) + { + if (useGetObject(columnSelectorFactory)) { + final BaseObjectColumnValueSelector objectSelector = (BaseObjectColumnValueSelector) selector; + return () -> objectSelector.getObject() == null; + } else { + return selector; + } + } + // ---- ABSTRACT METHODS BELOW ------ /** @@ -94,6 +115,17 @@ public final int getMaxIntermediateSizeWithNulls() */ protected abstract T selector(ColumnSelectorFactory columnSelectorFactory); + /** + * Returns whether the selector created by {@link #selector(ColumnSelectorFactory)} for the given + * {@link ColumnSelectorFactory} prefers {@link BaseObjectColumnValueSelector#getObject()}. + * + * For backwards compatibilty with older extensions, this is a non-abstract method. + */ + protected boolean useGetObject(ColumnSelectorFactory columnSelectorFactory) + { + return false; + } + /** * Creates a {@link VectorValueSelector} for the aggregated column. * diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnDoubleAggregatorWrapper.java similarity index 93% rename from processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java rename to processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnDoubleAggregatorWrapper.java index e970c94a028c..b250eacc78bb 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnDoubleAggregatorWrapper.java @@ -28,15 +28,15 @@ import java.util.function.Function; /** - * This class can be used to wrap Double Aggregator that consume double type columns to handle String type. + * This class can be used to wrap Double Aggregator that consume double type columns to handle Object type. */ -public class StringColumnDoubleAggregatorWrapper extends DelegatingAggregator +public class ObjectColumnDoubleAggregatorWrapper extends DelegatingAggregator { private final BaseObjectColumnValueSelector selector; private final double nullValue; private final SettableValueDoubleColumnValueSelector doubleSelector; - public StringColumnDoubleAggregatorWrapper( + public ObjectColumnDoubleAggregatorWrapper( BaseObjectColumnValueSelector selector, Function delegateBuilder, double nullValue diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnDoubleBufferAggregatorWrapper.java similarity index 93% rename from processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java rename to processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnDoubleBufferAggregatorWrapper.java index fb58ad5cc498..f50a6371f93d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnDoubleBufferAggregatorWrapper.java @@ -29,15 +29,15 @@ import java.util.function.Function; /** - * This class can be used to wrap Double BufferAggregator that consume double type columns to handle String type. + * This class can be used to wrap Double BufferAggregator that consume double type columns to handle Object type. */ -public class StringColumnDoubleBufferAggregatorWrapper extends DelegatingBufferAggregator +public class ObjectColumnDoubleBufferAggregatorWrapper extends DelegatingBufferAggregator { private final BaseObjectColumnValueSelector selector; private final double nullValue; private final SettableValueDoubleColumnValueSelector doubleSelector; - public StringColumnDoubleBufferAggregatorWrapper( + public ObjectColumnDoubleBufferAggregatorWrapper( BaseObjectColumnValueSelector selector, Function delegateBuilder, double nullValue diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnFloatAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnFloatAggregatorWrapper.java similarity index 93% rename from processing/src/main/java/org/apache/druid/query/aggregation/StringColumnFloatAggregatorWrapper.java rename to processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnFloatAggregatorWrapper.java index bb7cd65c409b..0f738b4bc0d9 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnFloatAggregatorWrapper.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnFloatAggregatorWrapper.java @@ -28,15 +28,15 @@ import java.util.function.Function; /** - * This class can be used to wrap Float Aggregator that consume float type columns to handle String type. + * This class can be used to wrap Float Aggregator that consume float type columns to handle Object type. */ -public class StringColumnFloatAggregatorWrapper extends DelegatingAggregator +public class ObjectColumnFloatAggregatorWrapper extends DelegatingAggregator { private final BaseObjectColumnValueSelector selector; private final float nullValue; private final SettableValueFloatColumnValueSelector floatSelector; - public StringColumnFloatAggregatorWrapper( + public ObjectColumnFloatAggregatorWrapper( BaseObjectColumnValueSelector selector, Function delegateBuilder, float nullValue diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnFloatBufferAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnFloatBufferAggregatorWrapper.java similarity index 93% rename from processing/src/main/java/org/apache/druid/query/aggregation/StringColumnFloatBufferAggregatorWrapper.java rename to processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnFloatBufferAggregatorWrapper.java index 7c1c5e548810..1c3725e968aa 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnFloatBufferAggregatorWrapper.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnFloatBufferAggregatorWrapper.java @@ -29,15 +29,15 @@ import java.util.function.Function; /** - * This class can be used to wrap Float BufferAggregator that consume float type columns to handle String type. + * This class can be used to wrap Float BufferAggregator that consume float type columns to handle Object type. */ -public class StringColumnFloatBufferAggregatorWrapper extends DelegatingBufferAggregator +public class ObjectColumnFloatBufferAggregatorWrapper extends DelegatingBufferAggregator { private final BaseObjectColumnValueSelector selector; private final float nullValue; private final SettableValueFloatColumnValueSelector floatSelector; - public StringColumnFloatBufferAggregatorWrapper( + public ObjectColumnFloatBufferAggregatorWrapper( BaseObjectColumnValueSelector selector, Function delegateBuilder, float nullValue diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnLongAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnLongAggregatorWrapper.java similarity index 93% rename from processing/src/main/java/org/apache/druid/query/aggregation/StringColumnLongAggregatorWrapper.java rename to processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnLongAggregatorWrapper.java index d218ab38cfb3..01b571a4e14d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnLongAggregatorWrapper.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnLongAggregatorWrapper.java @@ -28,15 +28,15 @@ import java.util.function.Function; /** - * This class can be used to wrap Long Aggregator that consume long type columns to handle String type. + * This class can be used to wrap Long Aggregator that consume long type columns to handle Object type. */ -public class StringColumnLongAggregatorWrapper extends DelegatingAggregator +public class ObjectColumnLongAggregatorWrapper extends DelegatingAggregator { private final BaseObjectColumnValueSelector selector; private final long nullValue; private final SettableValueLongColumnValueSelector longSelector; - public StringColumnLongAggregatorWrapper( + public ObjectColumnLongAggregatorWrapper( BaseObjectColumnValueSelector selector, Function delegateBuilder, long nullValue diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnLongBufferAggregatorWrapper.java b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnLongBufferAggregatorWrapper.java similarity index 93% rename from processing/src/main/java/org/apache/druid/query/aggregation/StringColumnLongBufferAggregatorWrapper.java rename to processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnLongBufferAggregatorWrapper.java index ad2e6c2cbc58..831c9a8f26ef 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnLongBufferAggregatorWrapper.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ObjectColumnLongBufferAggregatorWrapper.java @@ -29,15 +29,15 @@ import java.util.function.Function; /** - * This class can be used to wrap Long BufferAggregator that consume long type columns to handle String type. + * This class can be used to wrap Long BufferAggregator that consume long type columns to handle Object type. */ -public class StringColumnLongBufferAggregatorWrapper extends DelegatingBufferAggregator +public class ObjectColumnLongBufferAggregatorWrapper extends DelegatingBufferAggregator { private final BaseObjectColumnValueSelector selector; private final long nullValue; private final SettableValueLongColumnValueSelector longSelector; - public StringColumnLongBufferAggregatorWrapper( + public ObjectColumnLongBufferAggregatorWrapper( BaseObjectColumnValueSelector selector, Function delegateBuilder, long nullValue diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java index 324a10bb4d14..0fa96e226eae 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java @@ -31,11 +31,8 @@ import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.Types; -import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -86,8 +83,8 @@ public SimpleDoubleAggregatorFactory( @Override protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnDoubleAggregatorWrapper( + if (AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, metricFactory)) { + return new ObjectColumnDoubleAggregatorWrapper( selector, SimpleDoubleAggregatorFactory.this::buildAggregator, nullValue() @@ -103,8 +100,8 @@ protected BufferAggregator factorizeBuffered( ColumnValueSelector selector ) { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnDoubleBufferAggregatorWrapper( + if (AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, metricFactory)) { + return new ObjectColumnDoubleBufferAggregatorWrapper( selector, SimpleDoubleAggregatorFactory.this::buildBufferAggregator, nullValue() @@ -131,13 +128,10 @@ protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnS return AggregatorUtil.makeVectorValueSelector(columnSelectorFactory, fieldName, expression, fieldExpression); } - private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) + @Override + protected boolean useGetObject(ColumnSelectorFactory columnSelectorFactory) { - if (fieldName != null) { - ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); - return Types.is(capabilities, ValueType.STRING); - } - return false; + return AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, columnSelectorFactory); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java index 7633d39d9723..5268c454ce1b 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java @@ -31,10 +31,7 @@ import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.Types; -import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -76,8 +73,8 @@ public SimpleFloatAggregatorFactory( @Override protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnFloatAggregatorWrapper( + if (AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, metricFactory)) { + return new ObjectColumnFloatAggregatorWrapper( selector, SimpleFloatAggregatorFactory.this::buildAggregator, nullValue() @@ -93,8 +90,8 @@ protected BufferAggregator factorizeBuffered( ColumnValueSelector selector ) { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnFloatBufferAggregatorWrapper( + if (AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, metricFactory)) { + return new ObjectColumnFloatBufferAggregatorWrapper( selector, SimpleFloatAggregatorFactory.this::buildBufferAggregator, nullValue() @@ -121,6 +118,12 @@ protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnS return AggregatorUtil.makeVectorValueSelector(columnSelectorFactory, fieldName, expression, fieldExpression); } + @Override + protected boolean useGetObject(ColumnSelectorFactory columnSelectorFactory) + { + return AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, columnSelectorFactory); + } + @Override public Object deserialize(Object object) { @@ -233,15 +236,6 @@ public boolean canVectorize(ColumnInspector columnInspector) return AggregatorUtil.canVectorize(columnInspector, fieldName, expression, fieldExpression); } - private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) - { - if (fieldName != null) { - ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); - return Types.is(capabilities, ValueType.STRING); - } - return false; - } - protected abstract float nullValue(); protected abstract Aggregator buildAggregator(BaseFloatColumnValueSelector selector); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java index 173a9cb229db..c4bc5307ed48 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java @@ -31,10 +31,7 @@ import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.Types; -import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -82,8 +79,8 @@ public SimpleLongAggregatorFactory( @Override protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnLongAggregatorWrapper( + if (AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, metricFactory)) { + return new ObjectColumnLongAggregatorWrapper( selector, SimpleLongAggregatorFactory.this::buildAggregator, nullValue() @@ -99,8 +96,8 @@ protected BufferAggregator factorizeBuffered( ColumnValueSelector selector ) { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnLongBufferAggregatorWrapper( + if (AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, metricFactory)) { + return new ObjectColumnLongBufferAggregatorWrapper( selector, SimpleLongAggregatorFactory.this::buildBufferAggregator, nullValue() @@ -127,6 +124,12 @@ protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnS return AggregatorUtil.makeVectorValueSelector(columnSelectorFactory, fieldName, expression, fieldExpression); } + @Override + protected boolean useGetObject(ColumnSelectorFactory columnSelectorFactory) + { + return AggregatorUtil.shouldUseObjectColumnAggregatorWrapper(fieldName, columnSelectorFactory); + } + @Override public Object deserialize(Object object) { @@ -236,15 +239,6 @@ public boolean canVectorize(ColumnInspector columnInspector) return AggregatorUtil.canVectorize(columnInspector, fieldName, expression, fieldExpression); } - private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) - { - if (fieldName != null) { - ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); - return Types.is(capabilities, ValueType.STRING); - } - return false; - } - protected abstract long nullValue(); protected abstract Aggregator buildAggregator(BaseLongColumnValueSelector selector); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java index e0dc85a4450d..21c8aeacc254 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java @@ -63,7 +63,7 @@ public void setup() selector = new TestDoubleColumnSelectorImpl(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); VectorValueSelector vectorValueSelector = EasyMock.createMock(VectorValueSelector.class); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java index 5b4041b84225..40f72719f7ff 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java @@ -63,7 +63,7 @@ public void setup() selector = new TestDoubleColumnSelectorImpl(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java index d063658d76a2..535efb02faf7 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java @@ -62,7 +62,7 @@ public void setup() selector = new TestLongColumnSelector(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java index f651ce074b93..129f81d245a5 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java @@ -62,7 +62,7 @@ public void setup() selector = new TestLongColumnSelector(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); VectorValueSelector vectorValueSelector = EasyMock.createMock(VectorValueSelector.class);