Skip to content

Commit

Permalink
use datasketches-java 4.2.0 (apache#15257)
Browse files Browse the repository at this point in the history
* use datasketches-java 4.2.0

* use exclusive mode

* fixed issues raised by CodeQL

* fixed issue raised by spotbugs

* fixed issues raised by intellij

* added missing import

* Update QuantilesSketchKeyCollector search mode and adjust tests.

* Update sizeOf functions and add unit tests

* Add unit tests

---------

Co-authored-by: AlexanderSaydakov <[email protected]>
Co-authored-by: Gian Merlino <[email protected]>
Co-authored-by: Adarsh Sanjeev <[email protected]>
  • Loading branch information
4 people authored and LakshSingla committed Oct 27, 2023
1 parent f105704 commit a543b31
Show file tree
Hide file tree
Showing 16 changed files with 359 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.datasketches.kll.KllSketch;
import org.apache.datasketches.kll.KllSketch.SketchType;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
Expand Down Expand Up @@ -124,7 +126,7 @@ Class<KllDoublesSketch> getSketchClass()
@Override
int getMaxSerializedSizeBytes(final int k, final long n)
{
return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, true);
return KllSketch.getMaxSerializedSizeBytes(k, n, SketchType.DOUBLES_SKETCH, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.kll.KllFloatsSketch;
import org.apache.datasketches.kll.KllSketch;
import org.apache.datasketches.kll.KllSketch.SketchType;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
Expand Down Expand Up @@ -124,7 +126,7 @@ Class<KllFloatsSketch> getSketchClass()
@Override
int getMaxSerializedSizeBytes(final int k, final long n)
{
return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, true);
return KllSketch.getMaxSerializedSizeBytes(k, n, SketchType.FLOATS_SKETCH, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testSafeRead()
objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray();

// corrupted sketch should fail with a regular java buffer exception, not all subsets actually fail with the same
// index out of bounds exceptions, but at least this many do
// sketches exceptions, but at least this many do
for (int subset = 3; subset < 24; subset++) {
final byte[] garbage2 = new byte[subset];
for (int i = 0; i < garbage2.length; i++) {
Expand All @@ -123,7 +123,7 @@ public void testSafeRead()

final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
IndexOutOfBoundsException.class,
Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
);
}
Expand All @@ -132,7 +132,7 @@ public void testSafeRead()
final byte[] garbage = new byte[]{0x01, 0x02};
final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
IndexOutOfBoundsException.class,
Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testSafeRead()

final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
IndexOutOfBoundsException.class,
Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
);
}
Expand All @@ -132,7 +132,7 @@ public void testSafeRead()
final byte[] garbage = new byte[]{0x01, 0x02};
final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
Assert.assertThrows(
IndexOutOfBoundsException.class,
Exception.class,
() -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.primitives.Ints;
import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.datasketches.quantiles.ItemsUnion;
import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
import org.apache.druid.frame.key.ClusterByPartition;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.frame.key.RowKey;
Expand Down Expand Up @@ -149,7 +150,7 @@ public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetW

final int numPartitions = Ints.checkedCast(LongMath.divide(sketch.getN(), targetWeight, RoundingMode.CEILING));

final byte[][] quantiles = (sketch.getPartitionBoundaries(numPartitions)).boundaries;
final byte[][] quantiles = (sketch.getPartitionBoundaries(numPartitions, QuantileSearchCriteria.EXCLUSIVE)).boundaries;
final List<ClusterByPartition> partitions = new ArrayList<>();

for (int i = 0; i < numPartitions; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ByteArrayUtil;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.ItemsSketch;
Expand All @@ -32,6 +33,7 @@

import java.io.IOException;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Comparator;

public class QuantilesSketchKeyCollectorFactory
Expand Down Expand Up @@ -91,9 +93,9 @@ public QuantilesSketchKeyCollector fromSnapshot(QuantilesSketchKeyCollectorSnaps
return new QuantilesSketchKeyCollector(comparator, sketch, snapshot.getAverageKeyLength());
}

private static class ByteRowKeySerde extends ArrayOfItemsSerDe<byte[]>
static class ByteRowKeySerde extends ArrayOfItemsSerDe<byte[]>
{
private static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde();
static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde();

private ByteRowKeySerde()
{
Expand Down Expand Up @@ -126,22 +128,66 @@ public byte[] serializeToByteArray(final byte[][] items)
}

@Override
public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
public byte[][] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems)
{
final byte[][] keys = new byte[numItems][];
long keyPosition = (long) Integer.BYTES * numItems;
final long start = offsetBytes;
offsetBytes += (long) Integer.BYTES * numItems;

for (int i = 0; i < numItems; i++) {
final int keyLength = mem.getInt((long) Integer.BYTES * i);
final int keyLength = mem.getInt(start + (long) Integer.BYTES * i);
final byte[] keyBytes = new byte[keyLength];

mem.getByteArray(keyPosition, keyBytes, 0, keyLength);
mem.getByteArray(offsetBytes, keyBytes, 0, keyLength);
keys[i] = keyBytes;

keyPosition += keyLength;
offsetBytes += keyLength;
}

return keys;
}

@Override
public byte[] serializeToByteArray(final byte[] item)
{
final byte[] bytes = new byte[Integer.BYTES + item.length];
ByteArrayUtil.putIntLE(bytes, 0, item.length);
ByteArrayUtil.copyBytes(item, 0, bytes, Integer.BYTES, item.length);
return bytes;
}

@Override
public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
{
return deserializeFromMemory(mem, 0, numItems);
}

@Override
public int sizeOf(final byte[] item)
{
return Integer.BYTES + item.length;
}

@Override
public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
{
int length = Integer.BYTES * numItems;
for (int i = 0; i < numItems; i++) {
length += mem.getInt(offsetBytes + (long) Integer.BYTES * i);
}
return length;
}

@Override
public String toString(final byte[] item)
{
return Arrays.toString(item);
}

@Override
public Class<?> getClassOfT()
{
return byte[].class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void testSelectOnFoo()
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
.rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6})
.rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new long[]{6})
.frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}),
0, 0, "shuffle"
)
Expand Down Expand Up @@ -290,7 +290,9 @@ public void testSelectOnFoo2()
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
.with()
.rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L})
.frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}),
0, 0, "shuffle"
)
.verifyResults();
Expand Down Expand Up @@ -353,7 +355,7 @@ public void testSelectOnFooDuplicateColumnNames()
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
.rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6})
.rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new long[]{6})
.frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}),
0, 0, "shuffle"
)
Expand Down Expand Up @@ -1442,8 +1444,8 @@ public void testExternSelectWithMultipleWorkers() throws IOException
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
.rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L})
.frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}),
.rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{5L})
.frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{1L}),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
Expand All @@ -1459,27 +1461,27 @@ public void testExternSelectWithMultipleWorkers() throws IOException
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with()
.rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L})
.frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}),
.rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{5L})
.frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{1L}),
0, 1, "shuffle"
);
// adding result stage counter checks
if (isPageSizeLimited()) {
selectTester.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2, 0, 2),
.with().rows(2, 0, 2, 0, 2),
1, 0, "input0"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2, 0, 2),
.with().rows(2, 0, 2, 0, 2),
1, 0, "output"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(0, 2, 0, 4),
.with().rows(0, 2, 0, 2),
1, 1, "input0"
).setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(0, 2, 0, 4),
.with().rows(0, 2, 0, 2),
1, 1, "output"
);
}
Expand Down Expand Up @@ -1600,7 +1602,9 @@ public void testSelectOnUserDefinedSourceContainingWith()
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
.with()
.rows(isPageSizeLimited() ? new long[]{1, 2} : new long[]{3})
.frames(isPageSizeLimited() ? new long[]{1, 1} : new long[]{1}),
0, 0, "shuffle"
)
.verifyResults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ public void testWithDurableStorage() throws IOException
).getEntity();

Assert.assertEquals(ImmutableList.of(
new PageInformation(0, 1L, 75L, 0, 0),
new PageInformation(1, 2L, 121L, 0, 1),
new PageInformation(2, 3L, 164L, 0, 2)
new PageInformation(0, 2L, 120L, 0, 0),
new PageInformation(1, 2L, 118L, 0, 1),
new PageInformation(2, 2L, 122L, 0, 2)
), sqlStatementResult.getResultSetInformation().getPages());

assertExpectedResults(
Expand All @@ -348,7 +348,9 @@ public void testWithDurableStorage() throws IOException
);

assertExpectedResults(
"{\"cnt\":1,\"dim1\":\"\"}\n\n",
"{\"cnt\":1,\"dim1\":\"\"}\n"
+ "{\"cnt\":1,\"dim1\":\"10.1\"}\n"
+ "\n",
resource.doGetResults(
sqlStatementResult.getQueryId(),
0L,
Expand All @@ -359,8 +361,7 @@ public void testWithDurableStorage() throws IOException
);

assertExpectedResults(
"{\"cnt\":1,\"dim1\":\"1\"}\n"
+ "{\"cnt\":1,\"dim1\":\"def\"}\n"
"{\"cnt\":1,\"dim1\":\"def\"}\n"
+ "{\"cnt\":1,\"dim1\":\"abc\"}\n"
+ "\n",
resource.doGetResults(
Expand Down Expand Up @@ -412,7 +413,8 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
new PageInformation(0, 2L, 128L, 0, 0),
new PageInformation(1, 2L, 132L, 1, 1),
new PageInformation(2, 2L, 128L, 0, 2),
new PageInformation(3, 4L, 228L, 1, 3)
new PageInformation(3, 2L, 132L, 1, 3),
new PageInformation(4, 2L, 130L, 0, 4)
), sqlStatementResult.getResultSetInformation().getPages());


Expand Down Expand Up @@ -457,12 +459,19 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows.subList(6, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
Assert.assertEquals(rows.subList(6, 8), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
3L,
ResultFormat.ARRAY.name(),
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows.subList(8, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
4L,
ResultFormat.ARRAY.name(),
SqlStatementResourceTest.makeOkRequest()
)));
}

@Test
Expand Down
Loading

0 comments on commit a543b31

Please sign in to comment.