Skip to content

Commit

Permalink
Attempt to coerce COMPLEX to number in numeric aggregators. (apache#1…
Browse files Browse the repository at this point in the history
…6564)

* Coerce COMPLEX to number in numeric aggregators.

PR apache#15371 eliminated ObjectColumnSelector's built-in implementations of
numeric methods, which had been marked deprecated.

However, some complex types, like SpectatorHistogram, can be successfully coerced
to number. The documentation for spectator histograms encourages taking advantage of
this by aggregating complex columns with doubleSum and longSum. Currently, this
doesn't work properly for IncrementalIndex, where the behavior relied on those
deprecated ObjectColumnSelector methods.

This patch fixes the behavior by making two changes:

1) SimpleXYZAggregatorFactory (XYZ = type; base class for simple numeric aggregators;
   all of these extend NullableNumericAggregatorFactory) use getObject for STRING
   and COMPLEX. Previously, getObject was only used for STRING.

2) NullableNumericAggregatorFactory (base class for simple numeric aggregators)
   has a new protected method "useGetObject". This allows the base class to
   correctly check for null (using getObject or isNull).

The patch also adds a test for SpectatorHistogram + doubleSum + IncrementalIndex.

* Fix tests.

* Remove the special ColumnValueSelector.

* Add test.
  • Loading branch information
gianm authored Jul 25, 2024
1 parent b5f117b commit b2a88da
Show file tree
Hide file tree
Showing 21 changed files with 236 additions and 144 deletions.
10 changes: 10 additions & 0 deletions extensions-contrib/spectator-histogram/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@
<artifactId>error_prone_annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
Expand Down Expand Up @@ -137,5 +142,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@

package org.apache.druid.spectator.histogram;

import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.ReadableOffset;

import javax.annotation.Nullable;

public class SpectatorHistogramIndexBasedComplexColumn implements ComplexColumn
{
private final SpectatorHistogramIndexed index;
private final String typeName;
private static final Number ZERO = 0;

public SpectatorHistogramIndexBasedComplexColumn(String typeName, SpectatorHistogramIndexed index)
{
Expand Down Expand Up @@ -59,72 +53,11 @@ public Object getRowValue(int rowNum)
@Override
public int getLength()
{
return index.size();
return -1;
}

@Override
public void close()
{
}

@Override
public ColumnValueSelector<SpectatorHistogram> makeColumnValueSelector(ReadableOffset offset)
{
// Use ColumnValueSelector directly so that we support being queried as a Number using
// longSum or doubleSum aggregators, the NullableNumericBufferAggregator will call isNull.
// This allows us to behave as a Number or SpectatorHistogram object.
// When queried as a Number, we're returning the count of entries in the histogram.
// As such, we can safely return 0 where the histogram is null.
return new ColumnValueSelector<SpectatorHistogram>()
{
@Override
public boolean isNull()
{
return getObject() == null;
}

private Number getOrZero()
{
SpectatorHistogram histogram = getObject();
return histogram != null ? histogram : ZERO;
}

@Override
public long getLong()
{
return getOrZero().longValue();
}

@Override
public float getFloat()
{
return getOrZero().floatValue();
}

@Override
public double getDouble()
{
return getOrZero().doubleValue();
}

@Nullable
@Override
public SpectatorHistogram getObject()
{
return (SpectatorHistogram) getRowValue(offset.getOffset());
}

@Override
public Class classOfObject()
{
return getClazz();
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", SpectatorHistogramIndexBasedComplexColumn.this);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
package org.apache.druid.spectator.histogram;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.netflix.spectator.api.histogram.PercentileBuckets;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.NoopInputRowParser;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Druids;
Expand All @@ -32,6 +38,9 @@
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
Expand All @@ -42,13 +51,17 @@
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -59,6 +72,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -716,6 +730,59 @@ public void testPercentilePostAggregator() throws Exception
}
}

@Test
public void testBuildingAndCountingHistogramsIncrementalIndex() throws Exception
{
List<String> dimensions = Collections.singletonList("d");
int n = 10;
DateTime startOfDay = DateTimes.of("2000-01-01");
List<InputRow> inputRows = new ArrayList<>(n);
for (int i = 1; i <= n; i++) {
String val = String.valueOf(i * 1.0d);

inputRows.add(new MapBasedInputRow(
startOfDay.plusMinutes(i),
dimensions,
ImmutableMap.of("x", i, "d", val)
));
}

IncrementalIndex index = AggregationTestHelper.createIncrementalIndex(
inputRows.iterator(),
new NoopInputRowParser(null),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new SpectatorHistogramAggregatorFactory("histogram", "x")
},
0,
Granularities.NONE,
100,
false
);

ImmutableList<Segment> segments = ImmutableList.of(
new IncrementalIndexSegment(index, SegmentId.dummy("test")),
helper.persistIncrementalIndex(index, null)
);

GroupByQuery query = new GroupByQuery.Builder()
.setDataSource("test")
.setGranularity(Granularities.HOUR)
.setInterval("1970/2050")
.setAggregatorSpecs(
new DoubleSumAggregatorFactory("doubleSum", "histogram")
).build();

Sequence<ResultRow> seq = helper.runQueryOnSegmentsObjs(segments, query);

List<ResultRow> results = seq.toList();
Assert.assertEquals(1, results.size());
// Check timestamp
Assert.assertEquals(startOfDay.getMillis(), results.get(0).get(0));
// Check doubleSum
Assert.assertEquals(n * segments.size(), (Double) results.get(0).get(1), 0.001);
}

private static void assertResultsMatch(List<ResultRow> results, int rowNum, String expectedProduct)
{
ResultRow row = results.get(rowNum);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.spectator.histogram;

import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

public class SpectatorHistogramIndexBasedComplexColumnTest
{
@Test
public void testComplexColumn()
{
final SpectatorHistogramIndexed mockIndexed = EasyMock.createMock(SpectatorHistogramIndexed.class);
EasyMock.replay(mockIndexed);

final String typeName = "type";
final SpectatorHistogramIndexBasedComplexColumn column =
new SpectatorHistogramIndexBasedComplexColumn("type", mockIndexed);
Assert.assertEquals(typeName, column.getTypeName());
Assert.assertEquals(-1, column.getLength());

EasyMock.verify(mockIndexed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
import org.apache.druid.segment.FloatColumnSelector;
import org.apache.druid.segment.LongColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.virtual.ExpressionSelectors;
Expand Down Expand Up @@ -428,4 +432,26 @@ public static Supplier<byte[]> getSimpleAggregatorCacheKeySupplier(
.array();
});
}

/**
* Whether a simple numeric aggregator should use {@link BaseObjectColumnValueSelector#getObject()}, and coerce the
* result to number, rather than using a primitive method like {@link BaseLongColumnValueSelector#getLong()}.
*
* @param fieldName field name, or null if the aggregator is expression-based
* @param columnSelectorFactory column selector factory
*/
public static boolean shouldUseObjectColumnAggregatorWrapper(
@Nullable final String fieldName,
final ColumnSelectorFactory columnSelectorFactory
)
{
if (fieldName != null) {
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);

// STRING can be coerced to a number. COMPLEX types can be subclasses of Number (or subclasses of some type
// that is coercible to a number.)
return Types.is(capabilities, ValueType.STRING) || Types.is(capabilities, ValueType.COMPLEX);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* An Aggregator that delegates everything. It is used by Aggregator wrappers e.g.
* {@link StringColumnDoubleAggregatorWrapper} that modify some behavior of a delegate.
* {@link ObjectColumnDoubleAggregatorWrapper} that modify some behavior of a delegate.
*/
public abstract class DelegatingAggregator implements Aggregator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* A BufferAggregator that delegates everything. It is used by BufferAggregator wrappers e.g.
* {@link StringColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate.
* {@link ObjectColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate.
*/
public abstract class DelegatingBufferAggregator implements BufferAggregator
{
Expand Down
Loading

0 comments on commit b2a88da

Please sign in to comment.