From a543b31625b36afa57de28e9a0795348ccd40bed Mon Sep 17 00:00:00 2001
From: Alexander Saydakov <13126686+AlexanderSaydakov@users.noreply.github.com>
Date: Thu, 26 Oct 2023 16:28:33 -0700
Subject: [PATCH] use datasketches-java 4.2.0 (#15257)

* use datasketches-java 4.2.0

* use exclusive mode

* fixed issues raised by CodeQL

* fixed issue raised by spotbugs

* fixed issues raised by intellij

* added missing import

* Update QuantilesSketchKeyCollector search mode and adjust tests.

* Update sizeOf functions and add unit tests

* Add unit tests

---------

Co-authored-by: AlexanderSaydakov <AlexanderSaydakov@users.noreply.github.com>
Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
Co-authored-by: Adarsh Sanjeev <adarshsanjeev@gmail.com>
---
 .../KllDoublesSketchAggregatorFactory.java    |  4 +-
 .../kll/KllFloatsSketchAggregatorFactory.java |  4 +-
 ...llDoublesSketchComplexMetricSerdeTest.java |  6 +-
 ...KllFloatsSketchComplexMetricSerdeTest.java |  4 +-
 .../QuantilesSketchKeyCollector.java          |  3 +-
 .../QuantilesSketchKeyCollectorFactory.java   | 60 +++++++++++--
 .../apache/druid/msq/exec/MSQSelectTest.java  | 28 +++---
 .../SqlMSQStatementResourcePostTest.java      | 25 ++++--
 .../msq/statistics/ByteRowKeySerdeTest.java   | 83 +++++++++++++++++
 .../ArrayOfStringTuplesSerDe.java             | 81 +++++++++++++++--
 .../ArrayOfStringsNullSafeSerde.java          | 12 ++-
 .../parallel/distribution/StringSketch.java   |  3 +-
 .../ArrayOfStringTuplesSerDeTest.java         | 89 +++++++++++++++++++
 .../ArrayOfStringsNullSafeSerdeTest.java      |  3 +-
 licenses.yaml                                 |  2 +-
 pom.xml                                       |  2 +-
 16 files changed, 359 insertions(+), 50 deletions(-)
 create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java
 create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java

diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
index 815227adf55b..267953e23647 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java
@@ -22,6 +22,8 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.kll.KllSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
 import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +126,7 @@ Class<KllDoublesSketch> getSketchClass()
   @Override
   int getMaxSerializedSizeBytes(final int k, final long n)
   {
-    return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, true);
+    return KllSketch.getMaxSerializedSizeBytes(k, n, SketchType.DOUBLES_SKETCH, true);
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
index 9cc61524615c..bdd672ab1257 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java
@@ -22,6 +22,8 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.datasketches.kll.KllFloatsSketch;
+import org.apache.datasketches.kll.KllSketch;
+import org.apache.datasketches.kll.KllSketch.SketchType;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
 import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -124,7 +126,7 @@ Class<KllFloatsSketch> getSketchClass()
   @Override
   int getMaxSerializedSizeBytes(final int k, final long n)
   {
-    return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, true);
+    return KllSketch.getMaxSerializedSizeBytes(k, n, SketchType.FLOATS_SKETCH, true);
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
index d0a263079906..730fb54c541d 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java
@@ -114,7 +114,7 @@ public void testSafeRead()
     objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray();
 
     // corrupted sketch should fail with a regular java buffer exception, not all subsets actually fail with the same
-    // index out of bounds exceptions, but at least this many do
+    // sketches exceptions, but at least this many do
     for (int subset = 3; subset < 24; subset++) {
       final byte[] garbage2 = new byte[subset];
       for (int i = 0; i < garbage2.length; i++) {
@@ -123,7 +123,7 @@ public void testSafeRead()
 
       final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
       Assert.assertThrows(
-          IndexOutOfBoundsException.class,
+          Exception.class,
           () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
       );
     }
@@ -132,7 +132,7 @@ public void testSafeRead()
     final byte[] garbage = new byte[]{0x01, 0x02};
     final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
     Assert.assertThrows(
-        IndexOutOfBoundsException.class,
+        Exception.class,
         () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
     );
   }
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
index 56a397789906..ee505fe65b88 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java
@@ -123,7 +123,7 @@ public void testSafeRead()
 
       final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN);
       Assert.assertThrows(
-          IndexOutOfBoundsException.class,
+          Exception.class,
           () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray()
       );
     }
@@ -132,7 +132,7 @@ public void testSafeRead()
     final byte[] garbage = new byte[]{0x01, 0x02};
     final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN);
     Assert.assertThrows(
-        IndexOutOfBoundsException.class,
+        Exception.class,
         () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray()
     );
   }
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
index 607265367c2c..a20ff40cc870 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
@@ -23,6 +23,7 @@
 import com.google.common.primitives.Ints;
 import org.apache.datasketches.quantiles.ItemsSketch;
 import org.apache.datasketches.quantiles.ItemsUnion;
+import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
 import org.apache.druid.frame.key.ClusterByPartition;
 import org.apache.druid.frame.key.ClusterByPartitions;
 import org.apache.druid.frame.key.RowKey;
@@ -149,7 +150,7 @@ public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetW
 
     final int numPartitions = Ints.checkedCast(LongMath.divide(sketch.getN(), targetWeight, RoundingMode.CEILING));
 
-    final byte[][] quantiles = (sketch.getPartitionBoundaries(numPartitions)).boundaries;
+    final byte[][] quantiles = (sketch.getPartitionBoundaries(numPartitions, QuantileSearchCriteria.EXCLUSIVE)).boundaries;
     final List<ClusterByPartition> partitions = new ArrayList<>();
 
     for (int i = 0; i < numPartitions; i++) {
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
index 3192813cfe1a..674dfe15acbb 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java
@@ -24,6 +24,7 @@
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.datasketches.common.ArrayOfItemsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.memory.WritableMemory;
 import org.apache.datasketches.quantiles.ItemsSketch;
@@ -32,6 +33,7 @@
 
 import java.io.IOException;
 import java.nio.ByteOrder;
+import java.util.Arrays;
 import java.util.Comparator;
 
 public class QuantilesSketchKeyCollectorFactory
@@ -91,9 +93,9 @@ public QuantilesSketchKeyCollector fromSnapshot(QuantilesSketchKeyCollectorSnaps
     return new QuantilesSketchKeyCollector(comparator, sketch, snapshot.getAverageKeyLength());
   }
 
-  private static class ByteRowKeySerde extends ArrayOfItemsSerDe<byte[]>
+  static class ByteRowKeySerde extends ArrayOfItemsSerDe<byte[]>
   {
-    private static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde();
+    static final ByteRowKeySerde INSTANCE = new ByteRowKeySerde();
 
     private ByteRowKeySerde()
     {
@@ -126,22 +128,66 @@ public byte[] serializeToByteArray(final byte[][] items)
     }
 
     @Override
-    public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+    public byte[][] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems)
     {
       final byte[][] keys = new byte[numItems][];
-      long keyPosition = (long) Integer.BYTES * numItems;
+      final long start = offsetBytes;
+      offsetBytes += (long) Integer.BYTES * numItems;
 
       for (int i = 0; i < numItems; i++) {
-        final int keyLength = mem.getInt((long) Integer.BYTES * i);
+        final int keyLength = mem.getInt(start + (long) Integer.BYTES * i);
         final byte[] keyBytes = new byte[keyLength];
 
-        mem.getByteArray(keyPosition, keyBytes, 0, keyLength);
+        mem.getByteArray(offsetBytes, keyBytes, 0, keyLength);
         keys[i] = keyBytes;
 
-        keyPosition += keyLength;
+        offsetBytes += keyLength;
       }
 
       return keys;
     }
+
+    @Override
+    public byte[] serializeToByteArray(final byte[] item)
+    {
+      final byte[] bytes = new byte[Integer.BYTES + item.length];
+      ByteArrayUtil.putIntLE(bytes, 0, item.length);
+      ByteArrayUtil.copyBytes(item, 0, bytes, Integer.BYTES, item.length);
+      return bytes;
+    }
+
+    @Override
+    public byte[][] deserializeFromMemory(final Memory mem, final int numItems)
+    {
+      return deserializeFromMemory(mem, 0, numItems);
+    }
+
+    @Override
+    public int sizeOf(final byte[] item)
+    {
+      return Integer.BYTES + item.length;
+    }
+
+    @Override
+    public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+    {
+      int length = Integer.BYTES * numItems;
+      for (int i = 0; i < numItems; i++) {
+        length += mem.getInt(offsetBytes + (long) Integer.BYTES * i);
+      }
+      return length;
+    }
+
+    @Override
+    public String toString(final byte[] item)
+    {
+      return Arrays.toString(item);
+    }
+
+    @Override
+    public Class<?> getClassOfT()
+    {
+      return byte[].class;
+    }
   }
 }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index b63ee479e202..441c98b91d8b 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -226,7 +226,7 @@ public void testSelectOnFoo()
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with()
-                .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6})
+                .rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new long[]{6})
                 .frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}),
             0, 0, "shuffle"
         )
@@ -290,7 +290,9 @@ public void testSelectOnFoo2()
         )
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
-                .with().rows(3).frames(1),
+                .with()
+                .rows(isPageSizeLimited() ? new long[]{1L, 2L} : new long[]{3L})
+                .frames(isPageSizeLimited() ? new long[]{1L, 1L} : new long[]{1L}),
             0, 0, "shuffle"
         )
         .verifyResults();
@@ -353,7 +355,7 @@ public void testSelectOnFooDuplicateColumnNames()
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with()
-                .rows(isPageSizeLimited() ? new long[]{1, 2, 3} : new long[]{6})
+                .rows(isPageSizeLimited() ? new long[]{2, 2, 2} : new long[]{6})
                 .frames(isPageSizeLimited() ? new long[]{1, 1, 1} : new long[]{1}),
             0, 0, "shuffle"
         )
@@ -1442,8 +1444,8 @@ public void testExternSelectWithMultipleWorkers() throws IOException
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with()
-                .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L})
-                .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}),
+                .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{5L})
+                .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{1L}),
             0, 0, "shuffle"
         )
         .setExpectedCountersForStageWorkerChannel(
@@ -1459,27 +1461,27 @@ public void testExternSelectWithMultipleWorkers() throws IOException
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with()
-                .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 2L} : new long[]{5L})
-                .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L} : new long[]{1L}),
+                .rows(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{5L})
+                .frames(isPageSizeLimited() ? new long[]{1L, 1L, 1L, 1L, 1L} : new long[]{1L}),
             0, 1, "shuffle"
         );
     // adding result stage counter checks
     if (isPageSizeLimited()) {
       selectTester.setExpectedCountersForStageWorkerChannel(
           CounterSnapshotMatcher
-              .with().rows(2, 0, 2),
+              .with().rows(2, 0, 2, 0, 2),
           1, 0, "input0"
       ).setExpectedCountersForStageWorkerChannel(
           CounterSnapshotMatcher
-              .with().rows(2, 0, 2),
+              .with().rows(2, 0, 2, 0, 2),
           1, 0, "output"
       ).setExpectedCountersForStageWorkerChannel(
           CounterSnapshotMatcher
-              .with().rows(0, 2, 0, 4),
+              .with().rows(0, 2, 0, 2),
           1, 1, "input0"
       ).setExpectedCountersForStageWorkerChannel(
           CounterSnapshotMatcher
-              .with().rows(0, 2, 0, 4),
+              .with().rows(0, 2, 0, 2),
           1, 1, "output"
       );
     }
@@ -1600,7 +1602,9 @@ public void testSelectOnUserDefinedSourceContainingWith()
         )
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
-                .with().rows(3).frames(1),
+                .with()
+                .rows(isPageSizeLimited() ? new long[]{1, 2} : new long[]{3})
+                .frames(isPageSizeLimited() ? new long[]{1, 1} : new long[]{1}),
             0, 0, "shuffle"
         )
         .verifyResults();
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
index 6650c7785555..070b3ae46a71 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
@@ -325,9 +325,9 @@ public void testWithDurableStorage() throws IOException
     ).getEntity();
 
     Assert.assertEquals(ImmutableList.of(
-        new PageInformation(0, 1L, 75L, 0, 0),
-        new PageInformation(1, 2L, 121L, 0, 1),
-        new PageInformation(2, 3L, 164L, 0, 2)
+        new PageInformation(0, 2L, 120L, 0, 0),
+        new PageInformation(1, 2L, 118L, 0, 1),
+        new PageInformation(2, 2L, 122L, 0, 2)
     ), sqlStatementResult.getResultSetInformation().getPages());
 
     assertExpectedResults(
@@ -348,7 +348,9 @@ public void testWithDurableStorage() throws IOException
     );
 
     assertExpectedResults(
-        "{\"cnt\":1,\"dim1\":\"\"}\n\n",
+        "{\"cnt\":1,\"dim1\":\"\"}\n"
+        + "{\"cnt\":1,\"dim1\":\"10.1\"}\n"
+        + "\n",
         resource.doGetResults(
             sqlStatementResult.getQueryId(),
             0L,
@@ -359,8 +361,7 @@ public void testWithDurableStorage() throws IOException
     );
 
     assertExpectedResults(
-        "{\"cnt\":1,\"dim1\":\"1\"}\n"
-        + "{\"cnt\":1,\"dim1\":\"def\"}\n"
+        "{\"cnt\":1,\"dim1\":\"def\"}\n"
         + "{\"cnt\":1,\"dim1\":\"abc\"}\n"
         + "\n",
         resource.doGetResults(
@@ -412,7 +413,8 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
         new PageInformation(0, 2L, 128L, 0, 0),
         new PageInformation(1, 2L, 132L, 1, 1),
         new PageInformation(2, 2L, 128L, 0, 2),
-        new PageInformation(3, 4L, 228L, 1, 3)
+        new PageInformation(3, 2L, 132L, 1, 3),
+        new PageInformation(4, 2L, 130L, 0, 4)
     ), sqlStatementResult.getResultSetInformation().getPages());
 
 
@@ -457,12 +459,19 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
         SqlStatementResourceTest.makeOkRequest()
     )));
 
-    Assert.assertEquals(rows.subList(6, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
+    Assert.assertEquals(rows.subList(6, 8), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
         sqlStatementResult.getQueryId(),
         3L,
         ResultFormat.ARRAY.name(),
         SqlStatementResourceTest.makeOkRequest()
     )));
+
+    Assert.assertEquals(rows.subList(8, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
+        sqlStatementResult.getQueryId(),
+        4L,
+        ResultFormat.ARRAY.name(),
+        SqlStatementResourceTest.makeOkRequest()
+    )));
   }
 
   @Test
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java
new file mode 100644
index 000000000000..9226ab4c81f0
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ByteRowKeySerdeTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.key.KeyTestUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ByteRowKeySerdeTest extends InitializedNullHandlingTest
+{
+  private final QuantilesSketchKeyCollectorFactory.ByteRowKeySerde serde =
+      QuantilesSketchKeyCollectorFactory.ByteRowKeySerde.INSTANCE;
+
+  @Test
+  public void testByteArraySerde()
+  {
+    testSerde(new byte[]{1, 5, 9, 3});
+    testSerde(new byte[][]{new byte[]{1, 5}, new byte[]{2, 3}, new byte[]{6, 7}});
+  }
+
+  @Test
+  public void testSerdeWithRowKeys()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("x", ColumnType.LONG)
+                                            .add("y", ColumnType.LONG)
+                                            .build();
+
+    testSerde(KeyTestUtils.createKey(rowSignature, 2, 4).array());
+  }
+
+  @Test
+  public void testEmptyArray()
+  {
+    testSerde(new byte[][]{});
+    testSerde(new byte[][]{new byte[]{1, 5}, new byte[]{}, new byte[]{2, 3}});
+  }
+
+  private void testSerde(byte[] byteRowKey)
+  {
+    byte[] bytes = serde.serializeToByteArray(byteRowKey);
+    Assert.assertEquals(serde.sizeOf(byteRowKey), bytes.length);
+
+    Memory wrappedMemory = Memory.wrap(bytes);
+    Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, 1), bytes.length);
+
+    byte[][] deserialized = serde.deserializeFromMemory(wrappedMemory, 1);
+    Assert.assertArrayEquals(new byte[][]{byteRowKey}, deserialized);
+  }
+
+  private void testSerde(byte[][] inputArray)
+  {
+    byte[] bytes = serde.serializeToByteArray(inputArray);
+    Assert.assertEquals(serde.sizeOf(inputArray), bytes.length);
+
+    Memory wrappedMemory = Memory.wrap(bytes);
+    Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, inputArray.length), bytes.length);
+
+    byte[][] deserialized = serde.deserializeFromMemory(wrappedMemory, inputArray.length);
+    Assert.assertArrayEquals(inputArray, deserialized);
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
index e3f76b5b92fd..8bba9f65aafd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
@@ -21,8 +21,9 @@
 
 import org.apache.datasketches.common.ArrayOfItemsSerDe;
 import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.common.ByteArrayUtil;
+import org.apache.datasketches.common.Util;
 import org.apache.datasketches.memory.Memory;
-import org.apache.datasketches.memory.WritableMemory;
 import org.apache.datasketches.memory.internal.UnsafeUtil;
 import org.apache.druid.data.input.StringTuple;
 
@@ -36,7 +37,7 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
   private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde();
 
   @Override
-  public byte[] serializeToByteArray(StringTuple[] items)
+  public byte[] serializeToByteArray(final StringTuple[] items)
   {
     int length = 0;
     final byte[][] itemsBytes = new byte[items.length][];
@@ -49,29 +50,27 @@ public byte[] serializeToByteArray(StringTuple[] items)
     }
 
     final byte[] bytes = new byte[length];
-    final WritableMemory mem = WritableMemory.writableWrap(bytes);
-    long offsetBytes = 0;
+    int offsetBytes = 0;
     for (int i = 0; i < items.length; i++) {
       // Add the number of items in the StringTuple
-      mem.putInt(offsetBytes, items[i].size());
+      ByteArrayUtil.putIntLE(bytes, offsetBytes, items[i].size());
       offsetBytes += Integer.BYTES;
 
       // Add the size of byte content for the StringTuple
-      mem.putInt(offsetBytes, itemsBytes[i].length);
+      ByteArrayUtil.putIntLE(bytes, offsetBytes, itemsBytes[i].length);
       offsetBytes += Integer.BYTES;
 
       // Add the byte contents of the StringTuple
-      mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
+      ByteArrayUtil.copyBytes(itemsBytes[i], 0, bytes, offsetBytes, itemsBytes[i].length);
       offsetBytes += itemsBytes[i].length;
     }
     return bytes;
   }
 
   @Override
-  public StringTuple[] deserializeFromMemory(Memory mem, int numItems)
+  public StringTuple[] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems)
   {
     final StringTuple[] array = new StringTuple[numItems];
-    long offsetBytes = 0;
     for (int i = 0; i < numItems; i++) {
       // Read the number of items in the StringTuple
       UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
@@ -96,4 +95,68 @@ public StringTuple[] deserializeFromMemory(Memory mem, int numItems)
     }
     return array;
   }
+
+  @Override
+  public byte[] serializeToByteArray(final StringTuple item)
+  {
+    final byte[] itemBytes = STRINGS_SERDE.serializeToByteArray(item.toArray());
+    final byte[] bytes = new byte[Integer.BYTES * 2 + itemBytes.length];
+    int offsetBytes = 0;
+    ByteArrayUtil.putIntLE(bytes, offsetBytes, item.size());
+    offsetBytes += Integer.BYTES;
+    ByteArrayUtil.putIntLE(bytes, offsetBytes, itemBytes.length);
+    offsetBytes += Integer.BYTES;
+    ByteArrayUtil.copyBytes(itemBytes, 0, bytes, offsetBytes, itemBytes.length);
+    return bytes;
+  }
+
+  @Override
+  public StringTuple[] deserializeFromMemory(final Memory mem, final int numItems)
+  {
+    return deserializeFromMemory(mem, 0, numItems);
+  }
+
+  @Override
+  public int sizeOf(final StringTuple item)
+  {
+    // Two integers to store number of strings in the tuple and the size of the byte array
+    int length = 2 * Integer.BYTES;
+    for (final String s : item.toArray()) {
+      length += STRINGS_SERDE.sizeOf(s);
+    }
+    return length;
+  }
+
+  @Override
+  public int sizeOf(final Memory mem, long offsetBytes, final int numItems)
+  {
+    final long start = offsetBytes;
+    for (int i = 0; i < numItems; i++) {
+      // Skip the number of items in the StringTuple
+      Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+      offsetBytes += Integer.BYTES;
+
+      // Read the size of byte content
+      Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+      final int byteContentSize = mem.getInt(offsetBytes);
+      offsetBytes += Integer.BYTES;
+
+      // Skip the byte content
+      Util.checkBounds(offsetBytes, byteContentSize, mem.getCapacity());
+      offsetBytes += byteContentSize;
+    }
+    return (int) (offsetBytes - start);
+  }
+
+  @Override
+  public String toString(final StringTuple item)
+  {
+    return item.toString();
+  }
+
+  @Override
+  public Class<?> getClassOfT()
+  {
+    return StringTuple.class;
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
index b5a8393b1724..cd1057417170 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 
-import org.apache.datasketches.common.ArrayOfItemsSerDe;
 import org.apache.datasketches.common.ArrayOfStringsSerDe;
 import org.apache.datasketches.common.Util;
 import org.apache.datasketches.memory.Memory;
@@ -35,7 +34,7 @@
  * The implementation is the same as {@link ArrayOfStringsSerDe}, except this
  * class handles null String values as well.
  */
-public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
+public class ArrayOfStringsNullSafeSerde extends ArrayOfStringsSerDe
 {
 
   private static final int NULL_STRING_LENGTH = -1;
@@ -106,5 +105,14 @@ public String[] deserializeFromMemory(final Memory mem, final int numItems)
     return array;
   }
 
+  @Override
+  public int sizeOf(String item)
+  {
+    if (item == null) {
+      return Integer.BYTES;
+    } else {
+      return super.sizeOf(item);
+    }
+  }
 }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
index e47fcf78c135..e2794f6b1caf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
@@ -33,6 +33,7 @@
 import com.google.common.base.Preconditions;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.quantiles.ItemsSketch;
+import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 
@@ -128,7 +129,7 @@ private PartitionBoundaries getEvenPartitionsByCount(int evenPartitionCount)
     if (delegate.isEmpty()) {
       return new PartitionBoundaries(new StringTuple[0]);
     }
-    return new PartitionBoundaries((delegate.getPartitionBoundaries(evenPartitionCount)).boundaries);
+    return new PartitionBoundaries((delegate.getPartitionBoundaries(evenPartitionCount, QuantileSearchCriteria.EXCLUSIVE)).boundaries);
   }
 
   @Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java
new file mode 100644
index 000000000000..9b909948ad04
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDeTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel.distribution;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.data.input.StringTuple;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ArrayOfStringTuplesSerDeTest
+{
+
+  private final ArrayOfStringTuplesSerDe serde = new ArrayOfStringTuplesSerDe();
+
+  @Test
+  public void testStringTupleSerde()
+  {
+    testSerde(StringTuple.create("abc"));
+    testSerde(StringTuple.create("abc", "def", "xyz"));
+    testSerde(new StringTuple[]{StringTuple.create("abc"), StringTuple.create("def", "efg"), StringTuple.create("z")});
+  }
+
+  @Test
+  public void testEmptyTuple()
+  {
+    testSerde(StringTuple.create());
+    testSerde(new StringTuple[]{});
+  }
+
+  @Test
+  public void testArrayWithNullAndEmptyString()
+  {
+    testSerde(StringTuple.create(""));
+    testSerde(StringTuple.create("abc", "def", ""));
+    testSerde(StringTuple.create("abc", null, "def"));
+    testSerde(new StringTuple[]{StringTuple.create(null, null), StringTuple.create(null, null)});
+    testSerde(new StringTuple[]{StringTuple.create("", ""), StringTuple.create("")});
+    testSerde(StringTuple.create("", null, "abc"));
+  }
+
+  @Test
+  public void testSizeOf()
+  {
+    StringTuple stringTuple = StringTuple.create("a", "b");
+    Assert.assertEquals(serde.sizeOf(stringTuple), serde.serializeToByteArray(stringTuple).length);
+  }
+
+  private void testSerde(StringTuple stringTuple)
+  {
+    byte[] bytes = serde.serializeToByteArray(stringTuple);
+    Assert.assertEquals(serde.sizeOf(stringTuple), bytes.length);
+
+    Memory wrappedMemory = Memory.wrap(bytes);
+    Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, 1), bytes.length);
+
+    StringTuple[] deserialized = serde.deserializeFromMemory(wrappedMemory, 1);
+    Assert.assertArrayEquals(new StringTuple[]{stringTuple}, deserialized);
+  }
+
+  private void testSerde(StringTuple[] inputArray)
+  {
+    byte[] bytes = serde.serializeToByteArray(inputArray);
+    Assert.assertEquals(serde.sizeOf(inputArray), bytes.length);
+
+    Memory wrappedMemory = Memory.wrap(bytes);
+    Assert.assertEquals(serde.sizeOf(wrappedMemory, 0, inputArray.length), bytes.length);
+
+    StringTuple[] deserialized = serde.deserializeFromMemory(wrappedMemory, inputArray.length);
+    Assert.assertArrayEquals(inputArray, deserialized);
+  }
+
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
index 927f311e4f95..bce82ee0dbc0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerdeTest.java
@@ -84,8 +84,9 @@ public void testIllegalStrLength()
   private void testSerde(String... inputArray)
   {
     byte[] bytes = serde.serializeToByteArray(inputArray);
+    Assert.assertEquals(serde.sizeOf(inputArray), bytes.length);
     String[] deserialized = serde.deserializeFromMemory(Memory.wrap(bytes), inputArray.length);
-    Assert.assertEquals(inputArray, deserialized);
+    Assert.assertArrayEquals(inputArray, deserialized);
   }
 
 }
diff --git a/licenses.yaml b/licenses.yaml
index f2c01acaffde..0632e36f7fda 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3477,7 +3477,7 @@ name: DataSketches
 license_category: binary
 module: java-core
 license_name: Apache License version 2.0
-version: 4.1.0
+version: 4.2.0
 libraries:
   - org.apache.datasketches: datasketches-java
 
diff --git a/pom.xml b/pom.xml
index bf7a43e09d88..5fb1f49b2f2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,7 @@
              default_config.fmpp
           -->
         <calcite.version>1.35.0</calcite.version>
-        <datasketches.version>4.1.0</datasketches.version>
+        <datasketches.version>4.2.0</datasketches.version>
         <datasketches.memory.version>2.2.0</datasketches.memory.version>
         <derby.version>10.14.2.0</derby.version>
         <dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>