diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java index 815227adf55b..267953e23647 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.datasketches.kll.KllSketch; +import org.apache.datasketches.kll.KllSketch.SketchType; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; @@ -124,7 +126,7 @@ Class getSketchClass() @Override int getMaxSerializedSizeBytes(final int k, final long n) { - return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, true); + return KllSketch.getMaxSerializedSizeBytes(k, n, SketchType.DOUBLES_SKETCH, true); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java index 9cc61524615c..bdd672ab1257 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.datasketches.kll.KllFloatsSketch; +import org.apache.datasketches.kll.KllSketch; +import org.apache.datasketches.kll.KllSketch.SketchType; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; @@ -124,7 +126,7 @@ Class getSketchClass() @Override int getMaxSerializedSizeBytes(final int k, final long n) { - return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, true); + return KllSketch.getMaxSerializedSizeBytes(k, n, SketchType.FLOATS_SKETCH, true); } @Override diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java index d0a263079906..730fb54c541d 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java @@ -114,7 +114,7 @@ public void testSafeRead() objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray(); // corrupted sketch should fail with a regular java buffer exception, not all subsets actually fail with the same - // index out of bounds exceptions, but at least this many do + // sketches exceptions, but at least this many do for (int subset = 3; subset < 24; subset++) { final byte[] garbage2 = new byte[subset]; for (int i = 0; i < garbage2.length; i++) { @@ -123,7 +123,7 @@ public void testSafeRead() final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray() ); } @@ -132,7 +132,7 @@ public void testSafeRead() final byte[] garbage = new byte[]{0x01, 0x02}; final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray() ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java index 56a397789906..ee505fe65b88 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java @@ -123,7 +123,7 @@ public void testSafeRead() final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray() ); } @@ -132,7 +132,7 @@ public void testSafeRead() final byte[] garbage = new byte[]{0x01, 0x02}; final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray() ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java index 607265367c2c..a20ff40cc870 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java @@ -23,6 +23,7 @@ import com.google.common.primitives.Ints; import org.apache.datasketches.quantiles.ItemsSketch; import org.apache.datasketches.quantiles.ItemsUnion; +import org.apache.datasketches.quantilescommon.QuantileSearchCriteria; import org.apache.druid.frame.key.ClusterByPartition; import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.frame.key.RowKey; @@ -149,7 +150,7 @@ public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetW final int numPartitions = Ints.checkedCast(LongMath.divide(sketch.getN(), targetWeight, RoundingMode.CEILING)); - final byte[][] quantiles = (sketch.getPartitionBoundaries(numPartitions)).boundaries; + final byte[][] quantiles = (sketch.getPartitionBoundaries(numPartitions, QuantileSearchCriteria.EXCLUSIVE)).boundaries; final List partitions = new ArrayList<>(); for (int i = 0; i < numPartitions; i++) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java index 3192813cfe1a..674dfe15acbb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer; import com.google.common.annotations.VisibleForTesting; import org.apache.datasketches.common.ArrayOfItemsSerDe; +import org.apache.datasketches.common.ByteArrayUtil; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.quantiles.ItemsSketch; @@ -32,6 +33,7 @@ import java.io.IOException; import java.nio.ByteOrder; +import java.util.Arrays; import java.util.Comparator; public class QuantilesSketchKeyCollectorFactory @@ -91,9 +93,9 @@ public QuantilesSketchKeyCollector fromSnapshot(QuantilesSketchKeyCollectorSnaps return new QuantilesSketchKeyCollector(comparator, sketch, snapshot.getAverageKeyLength()); } - private static class ByteRowKeySerde extends ArrayOfItemsSerDe + static class ByteRowKeySerde extends ArrayOfItemsSerDe { - private static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde(); + static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde(); private ByteRowKeySerde() { @@ -126,22 +128,66 @@ public byte[] serializeToByteArray(final byte[][] items) } @Override - public byte[][] deserializeFromMemory(final Memory mem, final int numItems) + public byte[][] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems) { final byte[][] keys = new byte[numItems][]; - long keyPosition = (long) Integer.BYTES * numItems; + final long start = offsetBytes; + offsetBytes += (long) Integer.BYTES * numItems; for (int i = 0; i < numItems; i++) { - final int keyLength = mem.getInt((long) Integer.BYTES * i); + final int keyLength = mem.getInt(start + (long) Integer.BYTES * i); final byte[] keyBytes = new byte[keyLength]; - mem.getByteArray(keyPosition, keyBytes, 0, keyLength); + mem.getByteArray(offsetBytes, keyBytes, 0, keyLength); keys[i] = keyBytes; - keyPosition += keyLength; + offsetBytes += keyLength; } return keys; } + + @Override + public byte[] serializeToByteArray(final byte[] item) + { + final byte[] bytes = new byte[Integer.BYTES + item.length]; + ByteArrayUtil.putIntLE(bytes, 0, item.length); + ByteArrayUtil.copyBytes(item, 0, bytes, Integer.BYTES, item.length); + return bytes; + } + + @Override + public byte[][] deserializeFromMemory(final Memory mem, final int numItems) + { + return deserializeFromMemory(mem, 0, numItems); + } + + @Override + public int sizeOf(final byte[] item) + { + return Integer.BYTES + item.length; + } + + @Override + public int sizeOf(final Memory mem, long offsetBytes, final int numItems) + { + int length = Integer.BYTES * numItems; + for (int i = 0; i < numItems; i++) { + length += mem.getInt(offsetBytes + (long) Integer.BYTES * i); + } + return length; + } + + @Override + public String toString(final byte[] item) + { + return Arrays.toString(item); + } + + @Override + public Class getClassOfT() + { + return byte[].class; + } } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index b63ee479e202..441c98b91d8b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -226,7 +226,7 @@ public void testSelectOnFoo() .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with() - .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6}) + .rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new long[]{6}) .frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}), 0, 0, "shuffle" ) @@ -290,7 +290,9 @@ public void testSelectOnFoo2() ) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(3).frames(1), + .with() + .rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L}) + .frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}), 0, 0, "shuffle" ) .verifyResults(); @@ -353,7 +355,7 @@ public void testSelectOnFooDuplicateColumnNames() .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with() - .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6}) + .rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new long[]{6}) .frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}), 0, 0, "shuffle" ) @@ -1442,8 +1444,8 @@ public void testExternSelectWithMultipleWorkers() throws IOException .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with() - .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L}) - .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}), + .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{5L}) + .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{1L}), 0, 0, "shuffle" ) .setExpectedCountersForStageWorkerChannel( @@ -1459,27 +1461,27 @@ public void testExternSelectWithMultipleWorkers() throws IOException .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with() - .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L}) - .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}), + .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{5L}) + .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{1L}), 0, 1, "shuffle" ); // adding result stage counter checks if (isPageSizeLimited()) { selectTester.setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(2, 0, 2), + .with().rows(2, 0, 2, 0, 2), 1, 0, "input0" ).setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(2, 0, 2), + .with().rows(2, 0, 2, 0, 2), 1, 0, "output" ).setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(0, 2, 0, 4), + .with().rows(0, 2, 0, 2), 1, 1, "input0" ).setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(0, 2, 0, 4), + .with().rows(0, 2, 0, 2), 1, 1, "output" ); } @@ -1600,7 +1602,9 @@ public void testSelectOnUserDefinedSourceContainingWith() ) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(3).frames(1), + .with() + .rows(isPageSizeLimited() ? new long[]{1, 2} : new long[]{3}) + .frames(isPageSizeLimited() ? new long[]{1, 1} : new long[]{1}), 0, 0, "shuffle" ) .verifyResults(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java index 6650c7785555..070b3ae46a71 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java @@ -325,9 +325,9 @@ public void testWithDurableStorage() throws IOException ).getEntity(); Assert.assertEquals(ImmutableList.of( - new PageInformation(0, 1L, 75L, 0, 0), - new PageInformation(1, 2L, 121L, 0, 1), - new PageInformation(2, 3L, 164L, 0, 2) + new PageInformation(0, 2L, 120L, 0, 0), + new PageInformation(1, 2L, 118L, 0, 1), + new PageInformation(2, 2L, 122L, 0, 2) ), sqlStatementResult.getResultSetInformation().getPages()); assertExpectedResults( @@ -348,7 +348,9 @@ public void testWithDurableStorage() throws IOException ); assertExpectedResults( - "{\"cnt\":1,\"dim1\":\"\"}\n\n", + "{\"cnt\":1,\"dim1\":\"\"}\n" + + "{\"cnt\":1,\"dim1\":\"10.1\"}\n" + + "\n", resource.doGetResults( sqlStatementResult.getQueryId(), 0L, @@ -359,8 +361,7 @@ public void testWithDurableStorage() throws IOException ); assertExpectedResults( - "{\"cnt\":1,\"dim1\":\"1\"}\n" - + "{\"cnt\":1,\"dim1\":\"def\"}\n" + "{\"cnt\":1,\"dim1\":\"def\"}\n" + "{\"cnt\":1,\"dim1\":\"abc\"}\n" + "\n", resource.doGetResults( @@ -412,7 +413,8 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException new PageInformation(0, 2L, 128L, 0, 0), new PageInformation(1, 2L, 132L, 1, 1), new PageInformation(2, 2L, 128L, 0, 2), - new PageInformation(3, 4L, 228L, 1, 3) + new PageInformation(3, 2L, 132L, 1, 3), + new PageInformation(4, 2L, 130L, 0, 4) ), sqlStatementResult.getResultSetInformation().getPages()); @@ -457,12 +459,19 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException SqlStatementResourceTest.makeOkRequest() ))); - Assert.assertEquals(rows.subList(6, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( + Assert.assertEquals(rows.subList(6, 8), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( sqlStatementResult.getQueryId(), 3L, ResultFormat.ARRAY.name(), SqlStatementResourceTest.makeOkRequest() ))); + + Assert.assertEquals(rows.subList(8, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( + sqlStatementResult.getQueryId(), + 4L, + ResultFormat.ARRAY.name(), + SqlStatementResourceTest.makeOkRequest() + ))); } @Test diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java new file mode 100644 index 000000000000..9226ab4c81f0 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.statistics; + +import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.key.KeyTestUtils; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +public class ByteRowKeySerdeTest extends InitializedNullHandlingTest +{ + private final QuantilesSketchKeyCollectorFactory.ByteRowKeySerde serde = + QuantilesSketchKeyCollectorFactory.ByteRowKeySerde.INSTANCE; + + @Test + public void testByteArraySerde() + { + testSerde(new byte[]{1, 5, 9, 3}); + testSerde(new byte[][]{new byte[]{1, 5}, new byte[]{2, 3}, new byte[]{6, 7}}); + } + + @Test + public void testSerdeWithRowKeys() + { + RowSignature rowSignature = RowSignature.builder() + .add("x", ColumnType.LONG) + .add("y", ColumnType.LONG) + .build(); + + testSerde(KeyTestUtils.createKey(rowSignature, 2, 4).array()); + } + + @Test + public void testEmptyArray() + { + testSerde(new byte[][]{}); + testSerde(new byte[][]{new byte[]{1, 5}, new byte[]{}, new byte[]{2, 3}}); + } + + private void testSerde(byte[] byteRowKey) + { + byte[] bytes = serde.serializeToByteArray(byteRowKey); + Assert.assertEquals(serde.sizeOf(byteRowKey), bytes.length); + + Memory wrappedMemory = Memory.wrap(bytes); + Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, 1), bytes.length); + + byte[][] deserialized = serde.deserializeFromMemory(wrappedMemory, 1); + Assert.assertArrayEquals(new byte[][]{byteRowKey}, deserialized); + } + + private void testSerde(byte[][] inputArray) + { + byte[] bytes = serde.serializeToByteArray(inputArray); + Assert.assertEquals(serde.sizeOf(inputArray), bytes.length); + + Memory wrappedMemory = Memory.wrap(bytes); + Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, inputArray.length), bytes.length); + + byte[][] deserialized = serde.deserializeFromMemory(wrappedMemory, inputArray.length); + Assert.assertArrayEquals(inputArray, deserialized); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java index e3f76b5b92fd..8bba9f65aafd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java @@ -21,8 +21,9 @@ import org.apache.datasketches.common.ArrayOfItemsSerDe; import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.common.ByteArrayUtil; +import org.apache.datasketches.common.Util; import org.apache.datasketches.memory.Memory; -import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.memory.internal.UnsafeUtil; import org.apache.druid.data.input.StringTuple; @@ -36,7 +37,7 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde(); @Override - public byte[] serializeToByteArray(StringTuple[] items) + public byte[] serializeToByteArray(final StringTuple[] items) { int length = 0; final byte[][] itemsBytes = new byte[items.length][]; @@ -49,29 +50,27 @@ public byte[] serializeToByteArray(StringTuple[] items) } final byte[] bytes = new byte[length]; - final WritableMemory mem = WritableMemory.writableWrap(bytes); - long offsetBytes = 0; + int offsetBytes = 0; for (int i = 0; i < items.length; i++) { // Add the number of items in the StringTuple - mem.putInt(offsetBytes, items[i].size()); + ByteArrayUtil.putIntLE(bytes, offsetBytes, items[i].size()); offsetBytes += Integer.BYTES; // Add the size of byte content for the StringTuple - mem.putInt(offsetBytes, itemsBytes[i].length); + ByteArrayUtil.putIntLE(bytes, offsetBytes, itemsBytes[i].length); offsetBytes += Integer.BYTES; // Add the byte contents of the StringTuple - mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length); + ByteArrayUtil.copyBytes(itemsBytes[i], 0, bytes, offsetBytes, itemsBytes[i].length); offsetBytes += itemsBytes[i].length; } return bytes; } @Override - public StringTuple[] deserializeFromMemory(Memory mem, int numItems) + public StringTuple[] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems) { final StringTuple[] array = new StringTuple[numItems]; - long offsetBytes = 0; for (int i = 0; i < numItems; i++) { // Read the number of items in the StringTuple UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity()); @@ -96,4 +95,68 @@ public StringTuple[] deserializeFromMemory(Memory mem, int numItems) } return array; } + + @Override + public byte[] serializeToByteArray(final StringTuple item) + { + final byte[] itemBytes = STRINGS_SERDE.serializeToByteArray(item.toArray()); + final byte[] bytes = new byte[Integer.BYTES * 2 + itemBytes.length]; + int offsetBytes = 0; + ByteArrayUtil.putIntLE(bytes, offsetBytes, item.size()); + offsetBytes += Integer.BYTES; + ByteArrayUtil.putIntLE(bytes, offsetBytes, itemBytes.length); + offsetBytes += Integer.BYTES; + ByteArrayUtil.copyBytes(itemBytes, 0, bytes, offsetBytes, itemBytes.length); + return bytes; + } + + @Override + public StringTuple[] deserializeFromMemory(final Memory mem, final int numItems) + { + return deserializeFromMemory(mem, 0, numItems); + } + + @Override + public int sizeOf(final StringTuple item) + { + // Two integers to store number of strings in the tuple and the size of the byte array + int length = 2 * Integer.BYTES; + for (final String s : item.toArray()) { + length += STRINGS_SERDE.sizeOf(s); + } + return length; + } + + @Override + public int sizeOf(final Memory mem, long offsetBytes, final int numItems) + { + final long start = offsetBytes; + for (int i = 0; i < numItems; i++) { + // Skip the number of items in the StringTuple + Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity()); + offsetBytes += Integer.BYTES; + + // Read the size of byte content + Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity()); + final int byteContentSize = mem.getInt(offsetBytes); + offsetBytes += Integer.BYTES; + + // Skip the byte content + Util.checkBounds(offsetBytes, byteContentSize, mem.getCapacity()); + offsetBytes += byteContentSize; + } + return (int) (offsetBytes - start); + } + + @Override + public String toString(final StringTuple item) + { + return item.toString(); + } + + @Override + public Class getClassOfT() + { + return StringTuple.class; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java index b5a8393b1724..cd1057417170 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution; -import org.apache.datasketches.common.ArrayOfItemsSerDe; import org.apache.datasketches.common.ArrayOfStringsSerDe; import org.apache.datasketches.common.Util; import org.apache.datasketches.memory.Memory; @@ -35,7 +34,7 @@ * The implementation is the same as {@link ArrayOfStringsSerDe}, except this * class handles null String values as well. */ -public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe +public class ArrayOfStringsNullSafeSerde extends ArrayOfStringsSerDe { private static final int NULL_STRING_LENGTH = -1; @@ -106,5 +105,14 @@ public String[] deserializeFromMemory(final Memory mem, final int numItems) return array; } + @Override + public int sizeOf(String item) + { + if (item == null) { + return Integer.BYTES; + } else { + return super.sizeOf(item); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java index e47fcf78c135..e2794f6b1caf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java @@ -33,6 +33,7 @@ import com.google.common.base.Preconditions; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.quantiles.ItemsSketch; +import org.apache.datasketches.quantilescommon.QuantileSearchCriteria; import org.apache.druid.data.input.StringTuple; import org.apache.druid.timeline.partition.PartitionBoundaries; @@ -128,7 +129,7 @@ private PartitionBoundaries getEvenPartitionsByCount(int evenPartitionCount) if (delegate.isEmpty()) { return new PartitionBoundaries(new StringTuple[0]); } - return new PartitionBoundaries((delegate.getPartitionBoundaries(evenPartitionCount)).boundaries); + return new PartitionBoundaries((delegate.getPartitionBoundaries(evenPartitionCount, QuantileSearchCriteria.EXCLUSIVE)).boundaries); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java new file mode 100644 index 000000000000..9b909948ad04 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel.distribution; + +import org.apache.datasketches.memory.Memory; +import org.apache.druid.data.input.StringTuple; +import org.junit.Assert; +import org.junit.Test; + +public class ArrayOfStringTuplesSerDeTest +{ + + private final ArrayOfStringTuplesSerDe serde = new ArrayOfStringTuplesSerDe(); + + @Test + public void testStringTupleSerde() + { + testSerde(StringTuple.create("abc")); + testSerde(StringTuple.create("abc", "def", "xyz")); + testSerde(new StringTuple[]{StringTuple.create("abc"), StringTuple.create("def", "efg"), StringTuple.create("z")}); + } + + @Test + public void testEmptyTuple() + { + testSerde(StringTuple.create()); + testSerde(new StringTuple[]{}); + } + + @Test + public void testArrayWithNullAndEmptyString() + { + testSerde(StringTuple.create("")); + testSerde(StringTuple.create("abc", "def", "")); + testSerde(StringTuple.create("abc", null, "def")); + testSerde(new StringTuple[]{StringTuple.create(null, null), StringTuple.create(null, null)}); + testSerde(new StringTuple[]{StringTuple.create("", ""), StringTuple.create("")}); + testSerde(StringTuple.create("", null, "abc")); + } + + @Test + public void testSizeOf() + { + StringTuple stringTuple = StringTuple.create("a", "b"); + Assert.assertEquals(serde.sizeOf(stringTuple), serde.serializeToByteArray(stringTuple).length); + } + + private void testSerde(StringTuple stringTuple) + { + byte[] bytes = serde.serializeToByteArray(stringTuple); + Assert.assertEquals(serde.sizeOf(stringTuple), bytes.length); + + Memory wrappedMemory = Memory.wrap(bytes); + Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, 1), bytes.length); + + StringTuple[] deserialized = serde.deserializeFromMemory(wrappedMemory, 1); + Assert.assertArrayEquals(new StringTuple[]{stringTuple}, deserialized); + } + + private void testSerde(StringTuple[] inputArray) + { + byte[] bytes = serde.serializeToByteArray(inputArray); + Assert.assertEquals(serde.sizeOf(inputArray), bytes.length); + + Memory wrappedMemory = Memory.wrap(bytes); + Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, inputArray.length), bytes.length); + + StringTuple[] deserialized = serde.deserializeFromMemory(wrappedMemory, inputArray.length); + Assert.assertArrayEquals(inputArray, deserialized); + } + +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java index 927f311e4f95..bce82ee0dbc0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java @@ -84,8 +84,9 @@ public void testIllegalStrLength() private void testSerde(String... inputArray) { byte[] bytes = serde.serializeToByteArray(inputArray); + Assert.assertEquals(serde.sizeOf(inputArray), bytes.length); String[] deserialized = serde.deserializeFromMemory(Memory.wrap(bytes), inputArray.length); - Assert.assertEquals(inputArray, deserialized); + Assert.assertArrayEquals(inputArray, deserialized); } } diff --git a/licenses.yaml b/licenses.yaml index f2c01acaffde..0632e36f7fda 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3477,7 +3477,7 @@ name: DataSketches license_category: binary module: java-core license_name: Apache License version 2.0 -version: 4.1.0 +version: 4.2.0 libraries: - org.apache.datasketches: datasketches-java diff --git a/pom.xml b/pom.xml index bf7a43e09d88..5fb1f49b2f2e 100644 --- a/pom.xml +++ b/pom.xml @@ -86,7 +86,7 @@ default_config.fmpp --> 1.35.0 - 4.1.0 + 4.2.0 2.2.0 10.14.2.0 4.2.19