Skip to content

Commit

Permalink
Change *VectorAggregator to support pair serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
ankit0811 authored and Ankit Kothari committed Oct 12, 2023
1 parent 97d1dac commit f5a1c50
Show file tree
Hide file tree
Showing 48 changed files with 1,352 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.serde.cell.StagedSerde;
import org.apache.druid.segment.serde.cell.StorableBuffer;

Expand Down Expand Up @@ -75,13 +76,16 @@ public void store(ByteBuffer byteBuffer)
}

if (rhsObject != null) {
byteBuffer.put(NullHandling.IS_NOT_NULL_BYTE);
if (pairClass.isAssignableFrom(SerializablePairLongLong.class)) {
byteBuffer.putLong((long) rhsObject);
} else if (pairClass.isAssignableFrom(SerializablePairLongDouble.class)) {
byteBuffer.putDouble((double) rhsObject);
} else if (pairClass.isAssignableFrom(SerializablePairLongFloat.class)) {
byteBuffer.putFloat((float) rhsObject);
}
} else {
byteBuffer.put(NullHandling.IS_NULL_BYTE);
}
}

Expand All @@ -100,7 +104,7 @@ public int getSerializedSize()
}
}

return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + rhsBytes;
return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Byte.BYTES + rhsBytes;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.serde.cell.StagedSerde;
import org.apache.druid.segment.serde.cell.StorableBuffer;

Expand Down Expand Up @@ -57,13 +58,16 @@ public void store(ByteBuffer byteBuffer)
Preconditions.checkNotNull(value.getLhs(), String.format(Locale.ENGLISH, "Long in %s must be non-null", pairCLass.getSimpleName()));
byteBuffer.putLong(value.getLhs());
if (rhsObject != null) {
byteBuffer.put(NullHandling.IS_NOT_NULL_BYTE);
if (pairCLass.isAssignableFrom(SerializablePairLongLong.class)) {
byteBuffer.putLong((long) rhsObject);
} else if (pairCLass.isAssignableFrom(SerializablePairLongDouble.class)) {
byteBuffer.putDouble((double) rhsObject);
} else if (pairCLass.isAssignableFrom(SerializablePairLongFloat.class)) {
byteBuffer.putFloat((float) rhsObject);
}
} else {
byteBuffer.put(NullHandling.IS_NULL_BYTE);
}
}

Expand All @@ -81,7 +85,7 @@ public int getSerializedSize()
rhsBytes = Float.BYTES;
}
}
return Long.BYTES + rhsBytes;
return Long.BYTES + Byte.BYTES + rhsBytes;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.query.aggregation;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
Expand All @@ -35,6 +34,8 @@ public class SerializablePairLongDoubleComplexMetricSerde extends AbstractSerial
{
public static final String TYPE_NAME = "serializablePairLongDouble";

private static final SerializablePairLongDoubleSimpleStagedSerde SERDE = new SerializablePairLongDoubleSimpleStagedSerde();

private static final Comparator<SerializablePair<Long, Double>> COMPARATOR = SerializablePair.createNullHandlingComparator(
Double::compare,
true
Expand Down Expand Up @@ -90,32 +91,17 @@ public Class<? extends SerializablePairLongDouble> getClazz()
@Override
public SerializablePairLongDouble fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
long lhs = readOnlyBuffer.getLong();
boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE;
if (isNotNull) {
return new SerializablePairLongDouble(lhs, readOnlyBuffer.getDouble());
} else {
return new SerializablePairLongDouble(lhs, null);
}
ByteBuffer readOnlyByteBuffer = buffer.asReadOnlyBuffer().order(buffer.order());

readOnlyByteBuffer.limit(buffer.position() + numBytes);

return SERDE.deserialize(readOnlyByteBuffer);
}

@Override
public byte[] toBytes(@Nullable SerializablePairLongDouble inPair)
{
if (inPair == null) {
return new byte[]{};
}

ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Double.BYTES);
bbuf.putLong(inPair.lhs);
if (inPair.rhs == null) {
bbuf.put(NullHandling.IS_NULL_BYTE);
} else {
bbuf.put(NullHandling.IS_NOT_NULL_BYTE);
bbuf.putDouble(inPair.rhs);
}
return bbuf.array();
return SERDE.serialize(inPair);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.query.aggregation;

import org.apache.druid.common.config.NullHandling;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -51,7 +53,7 @@ public SerializablePairLongDouble deserialize(ByteBuffer byteBuffer)
lhs += minValue;

Double rhs = null;
if (readOnlyBuffer.hasRemaining()) {
if (readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE) {
rhs = readOnlyBuffer.getDouble();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.query.aggregation;

import org.apache.druid.common.config.NullHandling;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -42,7 +44,7 @@ public SerializablePairLongDouble deserialize(ByteBuffer byteBuffer)
long lhs = readOnlyBuffer.getLong();

Double rhs = null;
if (readOnlyBuffer.hasRemaining()) {
if (readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE) {
rhs = readOnlyBuffer.getDouble();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.query.aggregation;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
Expand All @@ -35,6 +34,8 @@ public class SerializablePairLongFloatComplexMetricSerde extends AbstractSeriali
{
public static final String TYPE_NAME = "serializablePairLongFloat";

private static final SerializablePairLongFloatSimpleStagedSerde SERDE = new SerializablePairLongFloatSimpleStagedSerde();

private static final Comparator<SerializablePair<Long, Float>> COMPARATOR = SerializablePair.createNullHandlingComparator(
Float::compare,
true
Expand Down Expand Up @@ -91,32 +92,17 @@ public Class<? extends SerializablePairLongFloat> getClazz()
@Override
public SerializablePairLongFloat fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
long lhs = readOnlyBuffer.getLong();
boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE;
if (isNotNull) {
return new SerializablePairLongFloat(lhs, readOnlyBuffer.getFloat());
} else {
return new SerializablePairLongFloat(lhs, null);
}
ByteBuffer readOnlyByteBuffer = buffer.asReadOnlyBuffer().order(buffer.order());

readOnlyByteBuffer.limit(buffer.position() + numBytes);

return SERDE.deserialize(readOnlyByteBuffer);
}

@Override
public byte[] toBytes(@Nullable SerializablePairLongFloat inPair)
{
if (inPair == null) {
return new byte[]{};
}

ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Float.BYTES);
bbuf.putLong(inPair.lhs);
if (inPair.rhs == null) {
bbuf.put(NullHandling.IS_NULL_BYTE);
} else {
bbuf.put(NullHandling.IS_NOT_NULL_BYTE);
bbuf.putFloat(inPair.rhs);
}
return bbuf.array();
return SERDE.serialize(inPair);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.query.aggregation;

import org.apache.druid.common.config.NullHandling;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -51,7 +53,7 @@ public SerializablePairLongFloat deserialize(ByteBuffer byteBuffer)
lhs += minValue;

Float rhs = null;
if (readOnlyBuffer.hasRemaining()) {
if (readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE) {
rhs = readOnlyBuffer.getFloat();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.query.aggregation;

import org.apache.druid.common.config.NullHandling;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -42,7 +44,7 @@ public SerializablePairLongFloat deserialize(ByteBuffer byteBuffer)
long lhs = readOnlyBuffer.getLong();

Float rhs = null;
if (readOnlyBuffer.hasRemaining()) {
if (readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE) {
rhs = readOnlyBuffer.getFloat();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.query.aggregation;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
Expand All @@ -35,6 +34,8 @@ public class SerializablePairLongLongComplexMetricSerde extends AbstractSerializ
{
public static final String TYPE_NAME = "serializablePairLongLong";

private static final SerializablePairLongLongSimpleStagedSerde SERDE = new SerializablePairLongLongSimpleStagedSerde();

private static final Comparator<SerializablePair<Long, Long>> COMPARATOR = SerializablePair.createNullHandlingComparator(
Long::compare,
true
Expand Down Expand Up @@ -90,32 +91,17 @@ public Class<? extends SerializablePairLongLong> getClazz()
@Override
public SerializablePairLongLong fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
long lhs = readOnlyBuffer.getLong();
boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE;
if (isNotNull) {
return new SerializablePairLongLong(lhs, readOnlyBuffer.getLong());
} else {
return new SerializablePairLongLong(lhs, null);
}
ByteBuffer readOnlyByteBuffer = buffer.asReadOnlyBuffer().order(buffer.order());

readOnlyByteBuffer.limit(buffer.position() + numBytes);

return SERDE.deserialize(readOnlyByteBuffer);
}

@Override
public byte[] toBytes(@Nullable SerializablePairLongLong inPair)
{
if (inPair == null) {
return new byte[]{};
}

ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Long.BYTES);
bbuf.putLong(inPair.lhs);
if (inPair.rhs == null) {
bbuf.put(NullHandling.IS_NULL_BYTE);
} else {
bbuf.put(NullHandling.IS_NOT_NULL_BYTE);
bbuf.putLong(inPair.rhs);
}
return bbuf.array();
return SERDE.serialize(inPair);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.query.aggregation;

import org.apache.druid.common.config.NullHandling;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -51,7 +53,7 @@ public SerializablePairLongLong deserialize(ByteBuffer byteBuffer)
lhs += minValue;

Long rhs = null;
if (readOnlyBuffer.hasRemaining()) {
if (readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE) {
rhs = readOnlyBuffer.getLong();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.query.aggregation;

import org.apache.druid.common.config.NullHandling;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -42,7 +44,7 @@ public SerializablePairLongLong deserialize(ByteBuffer byteBuffer)
long lhs = readOnlyBuffer.getLong();

Long rhs = null;
if (readOnlyBuffer.hasRemaining()) {
if (readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE) {
rhs = readOnlyBuffer.getLong();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.druid.query.aggregation.any;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.SerializablePairLongDouble;
import org.apache.druid.query.aggregation.SerializablePairLongFloat;
import org.apache.druid.query.aggregation.SerializablePairLongLong;
import org.apache.druid.query.aggregation.VectorAggregator;

import javax.annotation.Nullable;
Expand All @@ -43,9 +45,9 @@ public class NilVectorAggregator implements VectorAggregator
NullHandling.defaultLongValue()
);

public static final SerializablePair<Long, Double> DOUBLE_NIL_PAIR = new SerializablePair<>(0L, NullHandling.defaultDoubleValue());
public static final SerializablePair<Long, Long> LONG_NIL_PAIR = new SerializablePair<>(0L, NullHandling.defaultLongValue());
public static final SerializablePair<Long, Float> FLOAT_NIL_PAIR = new SerializablePair<>(0L, NullHandling.defaultFloatValue());
public static final SerializablePairLongDouble DOUBLE_NIL_PAIR = new SerializablePairLongDouble(0L, NullHandling.defaultDoubleValue());
public static final SerializablePairLongLong LONG_NIL_PAIR = new SerializablePairLongLong(0L, NullHandling.defaultLongValue());
public static final SerializablePairLongFloat FLOAT_NIL_PAIR = new SerializablePairLongFloat(0L, NullHandling.defaultFloatValue());

/**
* @return A vectorized aggregator that returns the default double value.
Expand Down
Loading

0 comments on commit f5a1c50

Please sign in to comment.