Skip to content

Commit

Permalink
Fix for schema mismatch to go down using the non vectorize path till …
Browse files Browse the repository at this point in the history
…we update the vectorized aggs properly (#14924)

* Fix for schema mismatch to go down using the non vectorize path till we update the vectorized aggs properly

* Fixing a failed test

* Updating numericNilAgg

* Moving to use default values in case of nil agg

* Adding the same for first agg

* Fixing a test

* fixing vectorized string agg for last/first with cast if numeric

* Updating tests to remove mockito and cover the case of string first/last on non string columns

* Updating a test to vectorize

* Addressing review comments: Name change to NilVectorAggregator and using static variables now

* fixing intellij inspections
  • Loading branch information
somu-imply authored Sep 13, 2023
1 parent 7f757e3 commit bf99d2c
Show file tree
Hide file tree
Showing 15 changed files with 497 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact
if (capabilities == null || capabilities.isNumeric()) {
return new DoubleAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName));
} else {
return NumericNilVectorAggregator.doubleNilVectorAggregator();
return NilVectorAggregator.doubleNilVectorAggregator();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact
if (capabilities == null || capabilities.isNumeric()) {
return new FloatAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName));
} else {
return NumericNilVectorAggregator.floatNilVectorAggregator();
return NilVectorAggregator.floatNilVectorAggregator();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact
if (capabilities == null || capabilities.isNumeric()) {
return new LongAnyVectorAggregator(selectorFactory.makeValueSelector(fieldName));
} else {
return NumericNilVectorAggregator.longNilVectorAggregator();
return NilVectorAggregator.longNilVectorAggregator();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.query.aggregation.any;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.VectorAggregator;

Expand All @@ -28,48 +29,57 @@
/**
* A vector aggregator that returns the default numeric value.
*/
public class NumericNilVectorAggregator implements VectorAggregator
public class NilVectorAggregator implements VectorAggregator
{
private static final NumericNilVectorAggregator DOUBLE_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator(
private static final NilVectorAggregator DOUBLE_NIL_VECTOR_AGGREGATOR = new NilVectorAggregator(
NullHandling.defaultDoubleValue()
);

private static final NumericNilVectorAggregator FLOAT_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator(
private static final NilVectorAggregator FLOAT_NIL_VECTOR_AGGREGATOR = new NilVectorAggregator(
NullHandling.defaultFloatValue()
);

private static final NumericNilVectorAggregator LONG_NIL_VECTOR_AGGREGATOR = new NumericNilVectorAggregator(
private static final NilVectorAggregator LONG_NIL_VECTOR_AGGREGATOR = new NilVectorAggregator(
NullHandling.defaultLongValue()
);

public static final SerializablePair<Long, Double> DOUBLE_NIL_PAIR = new SerializablePair<>(0L, NullHandling.defaultDoubleValue());
public static final SerializablePair<Long, Long> LONG_NIL_PAIR = new SerializablePair<>(0L, NullHandling.defaultLongValue());
public static final SerializablePair<Long, Float> FLOAT_NIL_PAIR = new SerializablePair<>(0L, NullHandling.defaultFloatValue());

/**
* @return A vectorized aggregator that returns the default double value.
*/
public static NumericNilVectorAggregator doubleNilVectorAggregator()
public static NilVectorAggregator doubleNilVectorAggregator()
{
return DOUBLE_NIL_VECTOR_AGGREGATOR;
}

/**
* @return A vectorized aggregator that returns the default float value.
*/
public static NumericNilVectorAggregator floatNilVectorAggregator()
public static NilVectorAggregator floatNilVectorAggregator()
{
return FLOAT_NIL_VECTOR_AGGREGATOR;
}

/**
* @return A vectorized aggregator that returns the default long value.
*/
public static NumericNilVectorAggregator longNilVectorAggregator()
public static NilVectorAggregator longNilVectorAggregator()
{
return LONG_NIL_VECTOR_AGGREGATOR;
}

@Nullable
private final Object returnValue;

private NumericNilVectorAggregator(@Nullable Object returnValue)
public static NilVectorAggregator of(Object returnValue)
{
return new NilVectorAggregator(returnValue);
}

private NilVectorAggregator(@Nullable Object returnValue)
{
this.returnValue = returnValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.aggregation.any.NilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
Expand Down Expand Up @@ -149,7 +149,7 @@ public VectorAggregator factorizeVector(
timeColumn);
return new DoubleFirstVectorAggregator(timeSelector, valueSelector);
}
return NumericNilVectorAggregator.doubleNilVectorAggregator();
return NilVectorAggregator.of(NilVectorAggregator.DOUBLE_NIL_PAIR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.aggregation.any.NilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
Expand Down Expand Up @@ -138,7 +138,7 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelect
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(timeColumn);
return new FloatFirstVectorAggregator(timeSelector, valueSelector);
}
return NumericNilVectorAggregator.floatNilVectorAggregator();
return NilVectorAggregator.of(NilVectorAggregator.FLOAT_NIL_PAIR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.aggregation.any.NilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
Expand Down Expand Up @@ -138,7 +138,7 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelect
timeColumn);
return new LongFirstVectorAggregator(timeSelector, valueSelector);
}
return NumericNilVectorAggregator.longNilVectorAggregator();
return NilVectorAggregator.of(NilVectorAggregator.LONG_NIL_PAIR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.virtual.ExpressionVectorSelectors;


import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -188,6 +191,17 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact
{
final VectorValueSelector timeSelector = selectorFactory.makeValueSelector(timeColumn);
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = selectorFactory.makeValueSelector(fieldName);
VectorObjectSelector objectSelector = ExpressionVectorSelectors.castValueSelectorToObject(
selectorFactory.getReadableVectorInspector(),
fieldName,
valueSelector,
capabilities.toColumnType(),
ColumnType.STRING
);
return new StringFirstVectorAggregator(timeSelector, objectSelector, maxStringBytes);
}
if (capabilities != null) {
if (capabilities.is(ValueType.STRING) && capabilities.isDictionaryEncoded().isTrue()) {
// Case 1: Single value string with dimension selector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.aggregation.any.NilVectorAggregator;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand All @@ -42,6 +43,7 @@
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

Expand Down Expand Up @@ -125,14 +127,12 @@ public VectorAggregator factorizeVector(
)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);

VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(
timeColumn);
if (capabilities == null || capabilities.isNumeric()) {
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(timeColumn);
return new DoubleLastVectorAggregator(timeSelector, valueSelector);
} else {
return NumericNilVectorAggregator.doubleNilVectorAggregator();
return NilVectorAggregator.of(new SerializablePair<>(0L, NullHandling.defaultDoubleValue()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.aggregation.any.NilVectorAggregator;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand All @@ -42,6 +43,7 @@
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

Expand Down Expand Up @@ -136,15 +138,13 @@ public VectorAggregator factorizeVector(
VectorColumnSelectorFactory columnSelectorFactory
)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);

VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(
timeColumn);
if (capabilities == null || capabilities.isNumeric()) {
final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(timeColumn);
return new FloatLastVectorAggregator(timeSelector, valueSelector);
} else {
return NumericNilVectorAggregator.floatNilVectorAggregator();
return NilVectorAggregator.of(new SerializablePair<>(0L, NullHandling.defaultFloatValue()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.aggregation.any.NilVectorAggregator;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand All @@ -42,6 +43,7 @@
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

Expand Down Expand Up @@ -136,14 +138,13 @@ public VectorAggregator factorizeVector(
VectorColumnSelectorFactory columnSelectorFactory
)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(
timeColumn);
if (capabilities == null || capabilities.isNumeric()) {
final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(timeColumn);
return new LongLastVectorAggregator(timeSelector, valueSelector);
} else {
return NumericNilVectorAggregator.longNilVectorAggregator();
return NilVectorAggregator.of(new SerializablePair<>(0L, NullHandling.defaultLongValue()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.virtual.ExpressionVectorSelectors;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -156,16 +158,25 @@ public boolean canVectorize(ColumnInspector columnInspector)
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{

ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
final ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector timeSelector = selectorFactory.makeValueSelector(timeColumn);
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector = selectorFactory.makeValueSelector(fieldName);
VectorObjectSelector objectSelector = ExpressionVectorSelectors.castValueSelectorToObject(
selectorFactory.getReadableVectorInspector(),
fieldName,
valueSelector,
capabilities.toColumnType(),
ColumnType.STRING
);
return new StringLastVectorAggregator(timeSelector, objectSelector, maxStringBytes);
}
VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName);
VectorValueSelector timeSelector = selectorFactory.makeValueSelector(
timeColumn);
if (capabilities != null) {
return new StringLastVectorAggregator(timeSelector, vSelector, maxStringBytes);
} else {
return new StringLastVectorAggregator(null, vSelector, maxStringBytes);
}

}

@Override
Expand Down
Loading

0 comments on commit bf99d2c

Please sign in to comment.