Skip to content

Commit

Permalink
add substituteCombiningFactory implementations for datasketches aggs (a…
Browse files Browse the repository at this point in the history
…pache#17314)

Follow up to apache#17214, adds implementations for substituteCombiningFactory so that more
datasketches aggs can match projections, along with some projections tests for datasketches.
  • Loading branch information
clintropolis authored Oct 10, 2024
1 parent fb38e48 commit a6236c3
Show file tree
Hide file tree
Showing 13 changed files with 658 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggrega
return null;
}
HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) preAggregated;
if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding == that.stringEncoding && Objects.equals(
fieldName,
that.fieldName
)) {
if (lgK <= that.lgK &&
stringEncoding == that.stringEncoding &&
Objects.equals(fieldName, that.fieldName)
) {
return getCombiningFactory();
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,23 @@ public byte[] getCacheKey()
return new CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
KllSketchAggregatorFactory<?, ?> that = (KllSketchAggregatorFactory<?, ?>) preAggregated;
if (Objects.equals(fieldName, that.fieldName) && k == that.k && maxStreamLength <= that.maxStreamLength) {
return getCombiningFactory();
}
return null;
}

@Override
public boolean equals(final Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,25 @@ public byte[] getCacheKey()
return new CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}

if (getClass() != preAggregated.getClass()) {
return null;
}

DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) preAggregated;
if (k <= that.k && maxStreamLength <= that.getMaxStreamLength() && Objects.equals(fieldName, that.fieldName)) {
return getCombiningFactory();
}
return null;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

public abstract class SketchAggregatorFactory extends AggregatorFactory
{
Expand Down Expand Up @@ -266,6 +267,22 @@ public byte[] getCacheKey()
.array();
}

@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) preAggregated;
if (Objects.equals(fieldName, that.fieldName) && size <= that.size) {
return getCombiningFactory();
}
return null;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.column.ColumnType;

import javax.annotation.Nullable;
import java.util.Objects;

public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
{
Expand Down Expand Up @@ -165,6 +166,25 @@ public AggregatorFactory withName(String newName)
);
}

@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) preAggregated;
if (Objects.equals(fieldName, that.fieldName) &&
size <= that.size &&
isInputThetaSketch == that.isInputThetaSketch
) {
return getCombiningFactory();
}
return null;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,29 @@ public ColumnType getResultType()
return ColumnType.DOUBLE;
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}

if (getClass() != preAggregated.getClass()) {
return null;
}

ArrayOfDoublesSketchAggregatorFactory that = (ArrayOfDoublesSketchAggregatorFactory) preAggregated;
if (nominalEntries <= that.nominalEntries &&
numberOfValues == that.numberOfValues &&
Objects.equals(fieldName, that.fieldName) &&
Objects.equals(metricColumns, that.metricColumns)
) {
return getCombiningFactory();
}
return null;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,22 @@ public void testResultArraySignature()
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);
}

@Test
public void testCanSubstitute()
{
AggregatorFactory sketch = new KllDoublesSketchAggregatorFactory("sketch", "x", 200, null);
AggregatorFactory sketch2 = new KllDoublesSketchAggregatorFactory("other", "x", 200, null);
AggregatorFactory sketch3 = new KllDoublesSketchAggregatorFactory("sketch", "x", 200, 1_000L);
AggregatorFactory sketch4 = new KllDoublesSketchAggregatorFactory("sketch", "y", 200, null);
AggregatorFactory sketch5 = new KllDoublesSketchAggregatorFactory("sketch", "x", 300, null);

Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch));
Assert.assertNotNull(sketch2.substituteCombiningFactory(sketch));
Assert.assertNull(sketch.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch.substituteCombiningFactory(sketch4));
Assert.assertNull(sketch.substituteCombiningFactory(sketch5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,19 @@ public void testNullSketches()
ac.fold(new TestDoublesSketchColumnValueSelector());
Assert.assertNotNull(ac.getObject());
}

@Test
public void testCanSubstitute()
{
final DoublesSketchAggregatorFactory sketch = new DoublesSketchAggregatorFactory("sketch", "x", 1024, 1000L, null);
final DoublesSketchAggregatorFactory sketch2 = new DoublesSketchAggregatorFactory("other", "x", 1024, 2000L, null);
final DoublesSketchAggregatorFactory sketch3 = new DoublesSketchAggregatorFactory("another", "x", 2048, 1000L, null);
final DoublesSketchAggregatorFactory incompatible = new DoublesSketchAggregatorFactory("incompatible", "y", 1024, 1000L, null);

Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch2.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
Assert.assertNull(sketch3.substituteCombiningFactory(sketch));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
Expand Down Expand Up @@ -213,4 +214,18 @@ public void testFactorizeVectorOnUnsupportedComplexColumn()
Throwable exception = Assert.assertThrows(DruidException.class, () -> AGGREGATOR_16384.factorizeVector(vectorFactory));
Assert.assertEquals("Unsupported input [x] of type [COMPLEX<json>] for aggregator [COMPLEX<thetaSketchBuild>].", exception.getMessage());
}

@Test
public void testCanSubstitute()
{
AggregatorFactory sketch1 = new SketchMergeAggregatorFactory("sketch", "x", 16, true, false, 2);
AggregatorFactory sketch2 = new SketchMergeAggregatorFactory("other", "x", null, false, false, null);
AggregatorFactory sketch3 = new SketchMergeAggregatorFactory("sketch", "x", null, false, false, 3);
AggregatorFactory sketch4 = new SketchMergeAggregatorFactory("sketch", "y", null, false, false, null);

Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch1.substituteCombiningFactory(sketch4));
Assert.assertNull(sketch2.substituteCombiningFactory(sketch1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,21 @@ public void testWithName()
Assert.assertEquals(factory, factory.withName("name"));
Assert.assertEquals("newTest", factory.withName("newTest").getName());
}

@Test
public void testCanSubstitute()
{
AggregatorFactory sketch = new ArrayOfDoublesSketchAggregatorFactory("sketch", "x", null, null, null);
AggregatorFactory sketch2 = new ArrayOfDoublesSketchAggregatorFactory("sketch2", "x", null, null, null);
AggregatorFactory other = new ArrayOfDoublesSketchAggregatorFactory("other", "x", 8192, null, null);
AggregatorFactory incompatible = new ArrayOfDoublesSketchAggregatorFactory("incompatible", "x", 2048, null, null);
AggregatorFactory incompatible2 = new ArrayOfDoublesSketchAggregatorFactory("sketch", "y", null, null, null);
Assert.assertNotNull(sketch.substituteCombiningFactory(other));
Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
Assert.assertNotNull(sketch.substituteCombiningFactory(sketch));
Assert.assertNull(other.substituteCombiningFactory(sketch));
Assert.assertNull(sketch.substituteCombiningFactory(incompatible2));
Assert.assertNull(other.substituteCombiningFactory(incompatible2));
}
}
Loading

0 comments on commit a6236c3

Please sign in to comment.