From 2726c6f38823d42f7f82d3b4e8742f7a80fff19f Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 21 Nov 2024 15:37:55 +0530 Subject: [PATCH] Minor refactors to processing Some refactors across druid to clean up the code and add utility functions where required. --- .../apache/druid/msq/test/MSQTestBase.java | 3 +- .../query/aggregation/SerializedStorage.java | 195 ++++++---- .../org/apache/druid/segment/IndexIO.java | 3 +- .../org/apache/druid/segment/IndexSpec.java | 2 +- .../StringUtf8DictionaryEncodedColumn.java | 11 +- .../BlockLayoutColumnarDoublesSupplier.java | 12 +- .../BlockLayoutColumnarLongsSupplier.java | 15 +- .../CompressedVSizeColumnarIntsSupplier.java | 7 +- .../segment/data/CompressionFactory.java | 1 - .../NestedCommonFormatColumnSerializer.java | 2 +- .../nested/NestedDataComplexTypeSerde.java | 86 ++++- .../segment/nested/ScalarDoubleColumn.java | 27 +- .../ScalarDoubleColumnAndIndexSupplier.java | 21 +- .../segment/nested/ScalarLongColumn.java | 27 +- .../ScalarLongColumnAndIndexSupplier.java | 21 +- .../ScalarStringColumnAndIndexSupplier.java | 9 +- .../druid/segment/nested/VariantColumn.java | 6 +- .../nested/VariantColumnAndIndexSupplier.java | 3 +- .../nested/VariantColumnSerializer.java | 349 +++++++++++------- .../DictionaryEncodedColumnPartSerde.java | 3 +- ...ngUtf8DictionaryEncodedColumnSupplier.java | 15 +- .../druid/segment/serde/cell/IOIterator.java | 4 +- .../druid/segment/serde/cell/StagedSerde.java | 35 ++ .../PredicateValueMatcherFactoryTest.java | 7 +- .../segment/filter/ValueMatchersTest.java | 11 +- 25 files changed, 621 insertions(+), 254 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index e99b571b8072..060f8499e3eb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -511,7 +511,6 @@ public String getFormatString() binder -> binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class)), new JoinableFactoryModule(), new IndexingServiceTuningConfigModule(), - new MSQIndexingModule(), Modules.override(new MSQSqlModule()).with( binder -> { // Our Guice configuration currently requires bindings to exist even if they aren't ever used, the @@ -540,6 +539,7 @@ public String getFormatString() objectMapper = setupObjectMapper(injector); objectMapper.registerModules(new StorageConnectorModule().getJacksonModules()); + objectMapper.registerModules(new MSQIndexingModule().getJacksonModules()); objectMapper.registerModules(sqlModule.getJacksonModules()); objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList()); @@ -697,7 +697,6 @@ protected Supplier> getSupplierForSegment( break; default: throw new ISE("Cannot query segment %s in test runner", segmentId); - } Segment segment = new Segment() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java index a6fee46b3ca0..29d03d14615e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java @@ -19,18 +19,17 @@ package org.apache.druid.query.aggregation; -import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.druid.error.DruidException; import org.apache.druid.segment.serde.cell.IOIterator; -import org.apache.druid.segment.serde.cell.IntSerializer; import org.apache.druid.segment.serde.cell.StagedSerde; import org.apache.druid.segment.writeout.WriteOutBytes; import javax.annotation.Nullable; -import java.io.BufferedInputStream; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.IntBuffer; import java.util.NoSuchElementException; /** @@ -45,109 +44,181 @@ public class SerializedStorage { private final WriteOutBytes writeOutBytes; private final StagedSerde serde; - private final IntSerializer intSerializer = new IntSerializer(); + private final ByteBuffer itemOffsetsBytes; + private final IntBuffer itemSizes; + + private final LongArrayList rowChunkOffsets = new LongArrayList(); + private int numStored = 0; + private int maxSize = 0; public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde serde) + { + this(writeOutBytes, serde, 4096); + } + + public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde serde, int chunkSize) { this.writeOutBytes = writeOutBytes; this.serde = serde; + + this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * Integer.BYTES).order(ByteOrder.nativeOrder()); + this.itemSizes = itemOffsetsBytes.asIntBuffer(); } public void store(@Nullable T value) throws IOException { byte[] bytes = serde.serialize(value); - writeOutBytes.write(intSerializer.serialize(bytes.length)); - writeOutBytes.write(bytes); + maxSize = Math.max(maxSize, bytes.length); + itemSizes.put(bytes.length); + if (bytes.length > 0) { + writeOutBytes.write(bytes); + } + + ++numStored; + if (itemSizes.remaining() == 0) { + rowChunkOffsets.add(writeOutBytes.size()); + writeOutBytes.write(itemOffsetsBytes); + itemOffsetsBytes.clear(); + itemSizes.clear(); + } } + public int numStored() + { + return numStored; + } + + /** + * Generates an iterator over everything that has been stored. Also signifies the end of storing objects. + * iterator() can be called multiple times if needed, but after iterator() is called, store() can no longer be + * called. + * + * @return an iterator + * @throws IOException on failure + */ public IOIterator iterator() throws IOException { - return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde); + if (itemSizes.position() != itemSizes.limit()) { + rowChunkOffsets.add(writeOutBytes.size()); + itemOffsetsBytes.limit(itemSizes.position() * Integer.BYTES); + writeOutBytes.write(itemOffsetsBytes); + + // Move the limit to the position so that we fail subsequent writes and indicate that we are done + itemSizes.limit(itemSizes.position()); + } + + return new DeserializingIOIterator<>( + writeOutBytes, + rowChunkOffsets, + numStored, + itemSizes.capacity(), + maxSize, + serde + ); } private static class DeserializingIOIterator implements IOIterator { - private static final int NEEDS_READ = -2; - private static final int EOF = -1; + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer(); - private final byte[] intBytes; - private final BufferedInputStream inputStream; + private final WriteOutBytes medium; + private final LongArrayList rowChunkOffsets; + private final int numEntries; + private ByteBuffer tmpBuf; private final StagedSerde serde; - private int nextSize; - - public DeserializingIOIterator(InputStream inputStream, StagedSerde serde) + private final ByteBuffer itemOffsetsBytes; + private final int[] itemSizes; + + private long itemStartOffset; + private int chunkId = 0; + private int currId = 0; + private int itemIndex; + + public DeserializingIOIterator( + WriteOutBytes medium, + LongArrayList rowChunkOffsets, + int numEntries, + int chunkSize, + int maxSize, + StagedSerde serde + ) { - this.inputStream = new BufferedInputStream(inputStream); + this.medium = medium; + this.rowChunkOffsets = rowChunkOffsets; + this.numEntries = numEntries; + this.tmpBuf = ByteBuffer.allocate(maxSize).order(ByteOrder.nativeOrder()); this.serde = serde; - intBytes = new byte[Integer.BYTES]; - nextSize = NEEDS_READ; + + this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * Integer.BYTES).order(ByteOrder.nativeOrder()); + this.itemSizes = new int[chunkSize]; + this.itemIndex = chunkSize; } @Override - public boolean hasNext() throws IOException + public boolean hasNext() { - return getNextSize() > EOF; + return currId < numEntries; } @Override public T next() throws IOException { - int currentNextSize = getNextSize(); - - if (currentNextSize == -1) { - throw new NoSuchElementException("end of buffer reached"); + if (currId >= numEntries) { + throw new NoSuchElementException(); } - byte[] nextBytes = new byte[currentNextSize]; - int bytesRead = 0; - - while (bytesRead < currentNextSize) { - int result = inputStream.read(nextBytes, bytesRead, currentNextSize - bytesRead); - - if (result == -1) { - throw new NoSuchElementException("unexpected end of buffer reached"); + if (itemIndex >= itemSizes.length) { + if (chunkId == 0) { + itemStartOffset = 0; + } else { + if (itemStartOffset != rowChunkOffsets.getLong(chunkId - 1)) { + throw DruidException.defensive( + "Should have read up to the start of the offsets [%,d], " + + "but for some reason the values [%,d] don't align. Possible corruption?", + rowChunkOffsets.getLong(chunkId - 1), + itemStartOffset + ); + } + itemStartOffset += (((long) itemSizes.length) * Integer.BYTES); } - bytesRead += result; - } - - Preconditions.checkState(bytesRead == currentNextSize); - T value = serde.deserialize(nextBytes); - - nextSize = NEEDS_READ; - - return value; - } - - private int getNextSize() throws IOException - { - if (nextSize == NEEDS_READ) { - int bytesRead = 0; - - while (bytesRead < Integer.BYTES) { - int result = inputStream.read(intBytes, bytesRead, Integer.BYTES - bytesRead); + int numToRead = Math.min(itemSizes.length, numEntries - (chunkId * itemSizes.length)); + final long readOffset = rowChunkOffsets.getLong(chunkId++); + itemOffsetsBytes.clear(); + itemOffsetsBytes.limit(numToRead * Integer.BYTES); + medium.readFully(readOffset, itemOffsetsBytes); + itemOffsetsBytes.flip(); + itemOffsetsBytes.asIntBuffer().get(itemSizes, 0, numToRead); - if (result == -1) { - nextSize = EOF; - return EOF; - } else { - bytesRead += result; - } - } - Preconditions.checkState(bytesRead == Integer.BYTES); + itemIndex = 0; + } - nextSize = ByteBuffer.wrap(intBytes).order(ByteOrder.nativeOrder()).getInt(); + int bytesToRead = itemSizes[itemIndex]; + final T retVal; + if (bytesToRead == 0) { + retVal = serde.deserialize(EMPTY_BUFFER); + } else { + tmpBuf.clear(); + tmpBuf.limit(bytesToRead); + medium.readFully(itemStartOffset, tmpBuf); + tmpBuf.flip(); + + retVal = serde.deserialize(tmpBuf); } - return nextSize; + itemStartOffset += bytesToRead; + ++itemIndex; + ++currId; + + return retVal; } @Override - public void close() throws IOException + public void close() { - inputStream.close(); + } } } diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index 8470c63f3e7d..a6ddad614064 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -456,7 +456,8 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen new StringUtf8DictionaryEncodedColumnSupplier<>( index.getDimValueUtf8Lookup(dimension)::singleThreaded, null, - Suppliers.ofInstance(index.getDimColumn(dimension)) + Suppliers.ofInstance(index.getDimColumn(dimension)), + LEGACY_FACTORY.getBitmapFactory() ) ); GenericIndexed bitmaps = index.getBitmapIndexes().get(dimension); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java index 37a7e6cc0d67..1dc2f8496821 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java @@ -43,7 +43,7 @@ */ public class IndexSpec { - public static IndexSpec DEFAULT = IndexSpec.builder().build(); + public static final IndexSpec DEFAULT = IndexSpec.builder().build(); public static Builder builder() { diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java index bed58a436754..59da8ffcb0a8 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.column; import com.google.common.collect.Lists; +import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.extraction.ExtractionFn; @@ -73,16 +74,19 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum @Nullable private final ColumnarMultiInts multiValueColumn; private final Indexed utf8Dictionary; + private final BitmapFactory bitmapFactory; public StringUtf8DictionaryEncodedColumn( @Nullable ColumnarInts singleValueColumn, @Nullable ColumnarMultiInts multiValueColumn, - Indexed utf8Dictionary + Indexed utf8Dictionary, + BitmapFactory bitmapFactory ) { this.column = singleValueColumn; this.multiValueColumn = multiValueColumn; this.utf8Dictionary = utf8Dictionary; + this.bitmapFactory = bitmapFactory; } @Override @@ -135,6 +139,11 @@ public int getCardinality() return utf8Dictionary.size(); } + public BitmapFactory getBitmapFactory() + { + return bitmapFactory; + } + @Override public HistoricalDimensionSelector makeDimensionSelector( final ReadableOffset offset, diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index 4473fa77d461..010e0b698577 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -36,6 +36,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier> singleThreadedDoubleBuffers = baseDoubleBuffers.singleThreaded(); @@ -91,6 +94,11 @@ private class BlockLayoutColumnarDoubles implements ColumnarDoubles @Nullable DoubleBuffer doubleBuffer; + public CompressionStrategy getCompressionStrategy() + { + return strategy; + } + @Override public int size() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java index 2b46d9aa6e29..aa0346c6e34e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java @@ -43,6 +43,7 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier // The number of longs per buffer. private final int sizePer; private final CompressionFactory.LongEncodingReader baseReader; + private final CompressionStrategy strategy; public BlockLayoutColumnarLongsSupplier( int totalSize, @@ -53,6 +54,7 @@ public BlockLayoutColumnarLongsSupplier( CompressionStrategy strategy ) { + this.strategy = strategy; this.baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy)); this.totalSize = totalSize; this.sizePer = sizePer; @@ -124,7 +126,8 @@ public long get(int index) } } - private class BlockLayoutColumnarLongs implements ColumnarLongs + // This needs to be a public class so that SemanticCreator is able to call it. + public class BlockLayoutColumnarLongs implements ColumnarLongs { final CompressionFactory.LongEncodingReader reader = baseReader.duplicate(); final Indexed> singleThreadedLongBuffers = baseLongBuffers.singleThreaded(); @@ -140,6 +143,16 @@ private class BlockLayoutColumnarLongs implements ColumnarLongs @Nullable LongBuffer longBuffer; + public CompressionFactory.LongEncodingStrategy getEncodingStrategy() + { + return baseReader.getStrategy(); + } + + public CompressionStrategy getCompressionStrategy() + { + return strategy; + } + @Override public int size() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index eaf0b2a47a9b..dfa6667bf01f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -308,7 +308,7 @@ protected int _get(ByteBuffer buffer, boolean bigEngian, int bufferIndex) } } - private class CompressedVSizeColumnarInts implements ColumnarInts + public class CompressedVSizeColumnarInts implements ColumnarInts { final Indexed> singleThreadedBuffers = baseBuffers.singleThreaded(); @@ -329,6 +329,11 @@ public int size() return totalSize; } + public CompressionStrategy getCompressionStrategy() + { + return compression; + } + /** * Returns the value at the given index into the column. *

diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java index 91ec70b7f171..9e7d2ea5b616 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java @@ -303,7 +303,6 @@ public interface LongEncodingReader */ LongEncodingReader duplicate(); - @SuppressWarnings("unused") LongEncodingStrategy getStrategy(); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java index 54653f286a9f..88b5c240e1b0 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java @@ -79,7 +79,7 @@ protected void writeInternal(FileSmoosher smoosher, Serializer serializer, Strin ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(), fileName); } - protected void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper fileMapper) throws IOException + protected static void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper fileMapper) throws IOException { for (String internalName : fileMapper.getInternalFilenames()) { smoosher.add(internalName, fileMapper.mapFile(internalName)); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java index cb11f62bb0f4..3236b42402c5 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import it.unimi.dsi.fastutil.Hash; import org.apache.druid.data.input.impl.DimensionSchema; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.NestedDataColumnHandlerV4; @@ -117,29 +117,14 @@ public Class getClazz() @Override public Object fromByteBuffer(ByteBuffer buffer, int numBytes) { - final byte[] bytes = new byte[numBytes]; - buffer.get(bytes, 0, numBytes); - try { - return ColumnSerializerUtils.SMILE_MAPPER.readValue(bytes, StructuredData.class); - } - catch (IOException e) { - throw new ISE(e, "Unable to deserialize value"); - } + return deserializeBuffer(buffer, numBytes); } @Nullable @Override public byte[] toBytes(@Nullable Object val) { - if (val == null) { - return new byte[0]; - } - try { - return ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(val); - } - catch (JsonProcessingException e) { - throw new ISE(e, "Unable to serialize value [%s]", val); - } + return serializeToBytes(val); } @Override @@ -150,6 +135,71 @@ public boolean readRetainsBufferReference() }; } + /** + * Reads numBytes from the position to the limit of the byte buffer argument and deserailizes it into + * a {@link StructuredData} object using {@link ColumnSerializerUtils#SMILE_MAPPER}. + */ + public static StructuredData deserializeBuffer(ByteBuffer buf) + { + return deserializeBuffer(buf, buf.remaining()); + } + + /** + * Reads numBytes from the byte buffer argument and deserailizes it into a {@link StructuredData} object + * using {@link ColumnSerializerUtils#SMILE_MAPPER}. + */ + public static StructuredData deserializeBuffer(ByteBuffer buf, int numBytes) + { + if (numBytes == 0) { + return null; + } + + final byte[] bytes = new byte[numBytes]; + buf.get(bytes, 0, numBytes); + return deserializeBytes(bytes); + } + + /** + * Converts the bytes array into a {@link StructuredData} object using {@link ColumnSerializerUtils#SMILE_MAPPER}. + */ + public static StructuredData deserializeBytes(byte[] bytes) + { + return deserializeBytes(bytes, 0, bytes.length); + } + + /** + * Reads the bytes between offset and len from the byte array and deserializes a {@link StructuredData} object from + * it, using {@link ColumnSerializerUtils#SMILE_MAPPER}. + */ + public static StructuredData deserializeBytes(byte[] bytes, int offset, int len) + { + if (len == 0) { + return null; + } + try { + return ColumnSerializerUtils.SMILE_MAPPER.readValue(bytes, offset, len, StructuredData.class); + } + catch (IOException e) { + throw DruidException.defensive(e, "Unable to deserialize value"); + } + } + + /** + * Returns a byte array containing the val as serialized by {@link ColumnSerializerUtils#SMILE_MAPPER}. + */ + public static byte[] serializeToBytes(@Nullable Object val) + { + if (val == null) { + return new byte[0]; + } + try { + return ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(val); + } + catch (JsonProcessingException e) { + throw DruidException.defensive(e, "Unable to serialize value [%s]", val); + } + } + @Override public > TypeStrategy getTypeStrategy() { diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java index 6ade80b6e83e..e116a70d28d2 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.nested; +import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.semantic.SemanticUtils; @@ -27,6 +28,7 @@ import org.apache.druid.segment.DoubleColumnSelector; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.ColumnarDoubles; +import org.apache.druid.segment.data.ColumnarInts; import org.apache.druid.segment.data.FixedIndexed; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.ReadableOffset; @@ -40,6 +42,7 @@ import javax.annotation.Nullable; import java.util.Map; import java.util.function.Function; +import java.util.function.Supplier; /** * {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE} @@ -50,18 +53,24 @@ public class ScalarDoubleColumn implements NestedCommonFormatColumn SemanticUtils.makeAsMap(ScalarDoubleColumn.class); private final FixedIndexed doubleDictionary; + private final Supplier encodedValuesSupplier; private final ColumnarDoubles valueColumn; - private final ImmutableBitmap nullValueIndex; + private final ImmutableBitmap nullValueBitmap; + private final BitmapFactory bitmapFactory; public ScalarDoubleColumn( FixedIndexed doubleDictionary, + Supplier encodedValuesSupplier, ColumnarDoubles valueColumn, - ImmutableBitmap nullValueIndex + ImmutableBitmap nullValueBitmap, + BitmapFactory bitmapFactory ) { this.doubleDictionary = doubleDictionary; + this.encodedValuesSupplier = encodedValuesSupplier; this.valueColumn = valueColumn; - this.nullValueIndex = nullValueIndex; + this.nullValueBitmap = nullValueBitmap; + this.bitmapFactory = bitmapFactory; } @Override @@ -81,7 +90,7 @@ public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) { return new DoubleColumnSelector() { - private PeekableIntIterator nullIterator = nullValueIndex.peekableIterator(); + private PeekableIntIterator nullIterator = nullValueBitmap.peekableIterator(); private int nullMark = -1; private int offsetMark = -1; @@ -95,7 +104,7 @@ public double getDouble() public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("doubleColumn", valueColumn); - inspector.visit("nullBitmap", nullValueIndex); + inspector.visit("nullBitmap", nullValueBitmap); } @Override @@ -108,7 +117,7 @@ public boolean isNull() if (i < offsetMark) { // offset was reset, reset iterator state nullMark = -1; - nullIterator = nullValueIndex.peekableIterator(); + nullIterator = nullValueBitmap.peekableIterator(); } offsetMark = i; if (nullMark < i) { @@ -133,7 +142,7 @@ public VectorValueSelector makeVectorValueSelector(ReadableVectorOffset offset) private int id = ReadableVectorInspector.NULL_ID; @Nullable - private PeekableIntIterator nullIterator = nullValueIndex != null ? nullValueIndex.peekableIterator() : null; + private PeekableIntIterator nullIterator = nullValueBitmap != null ? nullValueBitmap.peekableIterator() : null; private int offsetMark = -1; @Override @@ -162,14 +171,14 @@ private void computeVectorsIfNeeded() if (offset.isContiguous()) { if (offset.getStartOffset() < offsetMark) { - nullIterator = nullValueIndex.peekableIterator(); + nullIterator = nullValueBitmap.peekableIterator(); } offsetMark = offset.getStartOffset() + offset.getCurrentVectorSize(); valueColumn.get(valueVector, offset.getStartOffset(), offset.getCurrentVectorSize()); } else { final int[] offsets = offset.getOffsets(); if (offsets[offsets.length - 1] < offsetMark) { - nullIterator = nullValueIndex.peekableIterator(); + nullIterator = nullValueBitmap.peekableIterator(); } offsetMark = offsets[offsets.length - 1]; valueColumn.get(valueVector, offsets, offset.getCurrentVectorSize()); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java index 6fa2fe6d0f08..1bde18e188a8 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java @@ -51,7 +51,9 @@ import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.BitmapSerdeFactory; import org.apache.druid.segment.data.ColumnarDoubles; +import org.apache.druid.segment.data.ColumnarInts; import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers; +import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier; import org.apache.druid.segment.data.FixedIndexed; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.VByte; @@ -109,6 +111,11 @@ public static ScalarDoubleColumnAndIndexSupplier read( columnName, ColumnSerializerUtils.DOUBLE_VALUE_COLUMN_FILE_NAME ); + final ByteBuffer encodedValuesBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( + mapper, + columnName, + ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME + ); final Supplier> doubleDictionarySupplier; if (parent != null) { @@ -128,6 +135,12 @@ public static ScalarDoubleColumnAndIndexSupplier read( ); } + final CompressedVSizeColumnarIntsSupplier encodedCol = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( + encodedValuesBuffer, + byteOrder, + columnBuilder.getFileMapper() + ); + final Supplier doubles = CompressedColumnarDoublesSuppliers.fromByteBuffer( doublesValueColumn, byteOrder @@ -144,6 +157,7 @@ public static ScalarDoubleColumnAndIndexSupplier read( ); return new ScalarDoubleColumnAndIndexSupplier( doubleDictionarySupplier, + encodedCol, doubles, rBitmaps, bitmapSerdeFactory.getBitmapFactory(), @@ -160,6 +174,7 @@ public static ScalarDoubleColumnAndIndexSupplier read( private final Supplier> doubleDictionarySupplier; + private final Supplier encodedValuesSupplier; private final Supplier valueColumnSupplier; private final GenericIndexed valueIndexes; @@ -170,6 +185,7 @@ public static ScalarDoubleColumnAndIndexSupplier read( private ScalarDoubleColumnAndIndexSupplier( Supplier> longDictionary, + Supplier encodedValuesSupplier, Supplier valueColumnSupplier, GenericIndexed valueIndexes, BitmapFactory bitmapFactory, @@ -177,6 +193,7 @@ private ScalarDoubleColumnAndIndexSupplier( ) { this.doubleDictionarySupplier = longDictionary; + this.encodedValuesSupplier = encodedValuesSupplier; this.valueColumnSupplier = valueColumnSupplier; this.valueIndexes = valueIndexes; this.bitmapFactory = bitmapFactory; @@ -189,8 +206,10 @@ public NestedCommonFormatColumn get() { return new ScalarDoubleColumn( doubleDictionarySupplier.get(), + encodedValuesSupplier, valueColumnSupplier.get(), - nullValueBitmap + nullValueBitmap, + bitmapFactory ); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java index 8a54ff312780..4d5db6a45f45 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.nested; +import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.semantic.SemanticUtils; @@ -26,6 +27,7 @@ import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.LongColumnSelector; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.ColumnarInts; import org.apache.druid.segment.data.ColumnarLongs; import org.apache.druid.segment.data.FixedIndexed; import org.apache.druid.segment.data.Indexed; @@ -40,6 +42,7 @@ import javax.annotation.Nullable; import java.util.Map; import java.util.function.Function; +import java.util.function.Supplier; /** * {@link NestedCommonFormatColumn} for {@link ColumnType#LONG} @@ -50,18 +53,24 @@ public class ScalarLongColumn implements NestedCommonFormatColumn SemanticUtils.makeAsMap(ScalarLongColumn.class); private final FixedIndexed longDictionary; + private final Supplier encodedValuesSupplier; private final ColumnarLongs valueColumn; - private final ImmutableBitmap nullValueIndex; + private final ImmutableBitmap nullValueBitmap; + private final BitmapFactory bitmapFactory; public ScalarLongColumn( FixedIndexed longDictionary, + Supplier encodedValuesSupplier, ColumnarLongs valueColumn, - ImmutableBitmap nullValueIndex + ImmutableBitmap nullValueBitmap, + BitmapFactory bitmapFactory ) { this.longDictionary = longDictionary; + this.encodedValuesSupplier = encodedValuesSupplier; this.valueColumn = valueColumn; - this.nullValueIndex = nullValueIndex; + this.nullValueBitmap = nullValueBitmap; + this.bitmapFactory = bitmapFactory; } @@ -82,7 +91,7 @@ public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) { return new LongColumnSelector() { - private PeekableIntIterator nullIterator = nullValueIndex.peekableIterator(); + private PeekableIntIterator nullIterator = nullValueBitmap.peekableIterator(); private int nullMark = -1; private int offsetMark = -1; @@ -96,7 +105,7 @@ public long getLong() public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("longColumn", valueColumn); - inspector.visit("nullBitmap", nullValueIndex); + inspector.visit("nullBitmap", nullValueBitmap); } @Override @@ -109,7 +118,7 @@ public boolean isNull() if (i < offsetMark) { // offset was reset, reset iterator state nullMark = -1; - nullIterator = nullValueIndex.peekableIterator(); + nullIterator = nullValueBitmap.peekableIterator(); } offsetMark = i; if (nullMark < i) { @@ -134,7 +143,7 @@ public VectorValueSelector makeVectorValueSelector(ReadableVectorOffset offset) private int id = ReadableVectorInspector.NULL_ID; @Nullable - private PeekableIntIterator nullIterator = nullValueIndex.peekableIterator(); + private PeekableIntIterator nullIterator = nullValueBitmap.peekableIterator(); private int offsetMark = -1; @Override @@ -163,14 +172,14 @@ private void computeVectorsIfNeeded() if (offset.isContiguous()) { if (offset.getStartOffset() < offsetMark) { - nullIterator = nullValueIndex.peekableIterator(); + nullIterator = nullValueBitmap.peekableIterator(); } offsetMark = offset.getStartOffset() + offset.getCurrentVectorSize(); valueColumn.get(valueVector, offset.getStartOffset(), offset.getCurrentVectorSize()); } else { final int[] offsets = offset.getOffsets(); if (offsets[offsets.length - 1] < offsetMark) { - nullIterator = nullValueIndex.peekableIterator(); + nullIterator = nullValueBitmap.peekableIterator(); } offsetMark = offsets[offsets.length - 1]; valueColumn.get(valueVector, offsets, offset.getCurrentVectorSize()); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java index a8d1fa057d86..1b1b9fb97e7b 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java @@ -49,8 +49,10 @@ import org.apache.druid.segment.column.TypeSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.BitmapSerdeFactory; +import org.apache.druid.segment.data.ColumnarInts; import org.apache.druid.segment.data.ColumnarLongs; import org.apache.druid.segment.data.CompressedColumnarLongsSupplier; +import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier; import org.apache.druid.segment.data.FixedIndexed; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.VByte; @@ -103,6 +105,11 @@ public static ScalarLongColumnAndIndexSupplier read( final SmooshedFileMapper mapper = columnBuilder.getFileMapper(); + final ByteBuffer encodedValuesBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile( + mapper, + columnName, + ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME + ); final ByteBuffer longsValueColumn = NestedCommonFormatColumnPartSerde.loadInternalFile( mapper, columnName, @@ -137,12 +144,19 @@ public static ScalarLongColumnAndIndexSupplier read( ); } + final CompressedVSizeColumnarIntsSupplier encodedCol = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( + encodedValuesBuffer, + byteOrder, + columnBuilder.getFileMapper() + ); + final Supplier longs = CompressedColumnarLongsSupplier.fromByteBuffer( longsValueColumn, byteOrder ); return new ScalarLongColumnAndIndexSupplier( longDictionarySupplier, + encodedCol, longs, rBitmaps, bitmapSerdeFactory.getBitmapFactory(), @@ -159,6 +173,7 @@ public static ScalarLongColumnAndIndexSupplier read( private final Supplier> longDictionarySupplier; + private final Supplier encodedValuesSupplier; private final Supplier valueColumnSupplier; private final GenericIndexed valueIndexes; @@ -170,6 +185,7 @@ public static ScalarLongColumnAndIndexSupplier read( private ScalarLongColumnAndIndexSupplier( Supplier> longDictionarySupplier, + Supplier encodedValuesSupplier, Supplier valueColumnSupplier, GenericIndexed valueIndexes, BitmapFactory bitmapFactory, @@ -177,6 +193,7 @@ private ScalarLongColumnAndIndexSupplier( ) { this.longDictionarySupplier = longDictionarySupplier; + this.encodedValuesSupplier = encodedValuesSupplier; this.valueColumnSupplier = valueColumnSupplier; this.valueIndexes = valueIndexes; this.bitmapFactory = bitmapFactory; @@ -189,8 +206,10 @@ public NestedCommonFormatColumn get() { return new ScalarLongColumn( longDictionarySupplier.get(), + encodedValuesSupplier, valueColumnSupplier.get(), - nullValueBitmap + nullValueBitmap, + bitmapFactory ); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java index 7f1111c2e8b5..ee7fe1475b73 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java @@ -117,6 +117,7 @@ public static ScalarStringColumnAndIndexSupplier read( private final Supplier encodedColumnSupplier; private final GenericIndexed valueIndexes; private final ColumnIndexSupplier stringIndexSupplier; + private final BitmapSerdeFactory serdeFactory; private ScalarStringColumnAndIndexSupplier( Supplier> dictionarySupplier, @@ -128,6 +129,7 @@ private ScalarStringColumnAndIndexSupplier( this.dictionarySupplier = dictionarySupplier; this.encodedColumnSupplier = encodedColumnSupplier; this.valueIndexes = valueIndexes; + this.serdeFactory = serdeFactory; this.stringIndexSupplier = new StringUtf8ColumnIndexSupplier<>( serdeFactory.getBitmapFactory(), dictionarySupplier, @@ -139,7 +141,12 @@ private ScalarStringColumnAndIndexSupplier( @Override public NestedCommonFormatColumn get() { - return new StringUtf8DictionaryEncodedColumn(encodedColumnSupplier.get(), null, dictionarySupplier.get()); + return new StringUtf8DictionaryEncodedColumn( + encodedColumnSupplier.get(), + null, + dictionarySupplier.get(), + serdeFactory.getBitmapFactory() + ); } @Nullable diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java index dfd6307148dd..8125fe531006 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java @@ -23,6 +23,7 @@ import com.google.common.primitives.Floats; import it.unimi.dsi.fastutil.ints.IntArraySet; import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.common.semantic.SemanticUtils; @@ -96,6 +97,7 @@ public class VariantColumn> private final ExpressionType logicalExpressionType; @Nullable private final FieldTypeInfo.TypeSet variantTypes; + private final BitmapFactory bitmapFactory; private final int adjustLongId; private final int adjustDoubleId; private final int adjustArrayId; @@ -108,7 +110,8 @@ public VariantColumn( ColumnarInts encodedValueColumn, ImmutableBitmap nullValueBitmap, ColumnType logicalType, - @Nullable Byte variantTypeSetByte + @Nullable Byte variantTypeSetByte, + BitmapFactory bitmapFactory ) { this.stringDictionary = stringDictionary; @@ -119,6 +122,7 @@ public VariantColumn( this.nullValueBitmap = nullValueBitmap; this.logicalExpressionType = ExpressionType.fromColumnTypeStrict(logicalType); this.variantTypes = variantTypeSetByte == null ? null : new FieldTypeInfo.TypeSet(variantTypeSetByte); + this.bitmapFactory = bitmapFactory; // use the variant type bytes if set, in current code the logical type should have been computed via this same means // however older versions of the code had a bug which could incorrectly classify mixed types as nested data if (variantTypeSetByte != null) { diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java index b93235b5babf..6a2ec4769762 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java @@ -268,7 +268,8 @@ public NestedCommonFormatColumn get() encodedValueColumnSupplier.get(), nullValueBitmap, logicalType, - variantTypeSetByte + variantTypeSetByte, + bitmapFactory ); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java index 9adb8fac2b75..5b706b44b7fe 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java @@ -20,12 +20,11 @@ package org.apache.druid.segment.nested; import com.google.common.base.Preconditions; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import it.unimi.dsi.fastutil.ints.IntIterator; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.MutableBitmap; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -47,6 +46,7 @@ import org.apache.druid.segment.data.GenericIndexedWriter; import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer; import org.apache.druid.segment.serde.ColumnSerializerUtils; +import org.apache.druid.segment.serde.Serializer; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import javax.annotation.Nullable; @@ -55,6 +55,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; +import java.util.Arrays; /** * Serializer for a {@link NestedCommonFormatColumn} for single type arrays and mixed type columns, but not columns @@ -80,7 +81,6 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer private boolean dictionarySerialized = false; private FixedIndexedIntWriter intermediateValueWriter; - private ByteBuffer columnNameBytes = null; private boolean hasNulls; private boolean writeDictionary = true; @Nullable @@ -88,6 +88,8 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer @Nullable private final Byte variantTypeSetByte; + private InternalSerializer internalSerializer = null; + public VariantColumnSerializer( String name, @Nullable ColumnType logicalType, @@ -299,155 +301,246 @@ public void serialize(ColumnValueSelector selector) th } } - private void closeForWrite() + private void closeForWrite() throws IOException { if (!closedForWrite) { - columnNameBytes = computeFilenameBytes(); + // write out compressed dictionaryId int column, bitmap indexes, and array element bitmap indexes + // by iterating intermediate value column the intermediate value column should be replaced someday by a cooler + // compressed int column writer that allows easy iteration of the values it writes out, so that we could just + // build the bitmap indexes here instead of doing both things + String filenameBase = StringUtils.format("%s.forward_dim", name); + final int scalarCardinality = dictionaryIdLookup.getStringCardinality() + + dictionaryIdLookup.getLongCardinality() + + dictionaryIdLookup.getDoubleCardinality(); + final int cardinality = scalarCardinality + dictionaryIdLookup.getArrayCardinality(); + final CompressionStrategy compression = indexSpec.getDimensionCompression(); + final CompressionStrategy compressionToUse; + if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) { + compressionToUse = compression; + } else { + compressionToUse = CompressionStrategy.LZ4; + } + + final SingleValueColumnarIntsSerializer encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create( + name, + segmentWriteOutMedium, + filenameBase, + cardinality, + compressionToUse, + segmentWriteOutMedium.getCloser() + ); + encodedValueSerializer.open(); + + final GenericIndexedWriter bitmapIndexWriter = new GenericIndexedWriter<>( + segmentWriteOutMedium, + name, + indexSpec.getBitmapSerdeFactory().getObjectStrategy() + ); + bitmapIndexWriter.open(); + bitmapIndexWriter.setObjectsNotSorted(); + final MutableBitmap[] bitmaps = new MutableBitmap[cardinality]; + final MutableBitmap[] arrayElements = new MutableBitmap[scalarCardinality]; + for (int i = 0; i < bitmaps.length; i++) { + bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap(); + } + final GenericIndexedWriter arrayElementIndexWriter = new GenericIndexedWriter<>( + segmentWriteOutMedium, + name + "_arrays", + indexSpec.getBitmapSerdeFactory().getObjectStrategy() + ); + arrayElementIndexWriter.open(); + arrayElementIndexWriter.setObjectsNotSorted(); + + final IntIterator rows = intermediateValueWriter.getIterator(); + int rowCount = 0; + final int arrayBaseId = dictionaryIdLookup.getStringCardinality() + + dictionaryIdLookup.getLongCardinality() + + dictionaryIdLookup.getDoubleCardinality(); + while (rows.hasNext()) { + final int dictId = rows.nextInt(); + encodedValueSerializer.addValue(dictId); + bitmaps[dictId].add(rowCount); + if (dictId >= arrayBaseId) { + int[] array = dictionaryIdLookup.getArrayValue(dictId); + for (int elementId : array) { + MutableBitmap bitmap = arrayElements[elementId]; + if (bitmap == null) { + bitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap(); + arrayElements[elementId] = bitmap; + } + bitmap.add(rowCount); + } + } + rowCount++; + } + + for (int i = 0; i < bitmaps.length; i++) { + final MutableBitmap bitmap = bitmaps[i]; + bitmapIndexWriter.write( + indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap) + ); + bitmaps[i] = null; // Reclaim memory + } + if (writeDictionary) { + for (int i = 0; i < arrayElements.length; ++i) { + if (arrayElements[i] != null) { + arrayElementDictionaryWriter.write(i); + arrayElementIndexWriter.write(arrayElements[i]); + } + } + } + closedForWrite = true; + internalSerializer = new InternalSerializer( + name, + variantTypeSetByte, + dictionaryWriter, + longDictionaryWriter, + doubleDictionaryWriter, + arrayDictionaryWriter, + encodedValueSerializer, + bitmapIndexWriter, + arrayElementDictionaryWriter, + arrayElementIndexWriter, + dictionaryIdLookup, + writeDictionary + ); } } @Override - public long getSerializedSize() + public long getSerializedSize() throws IOException { closeForWrite(); - - long size = 1 + columnNameBytes.capacity(); - // the value dictionaries, raw column, and null index are all stored in separate files - if (variantTypeSetByte != null) { - size += 1; - } - return size; + return internalSerializer.getSerializedSize(); } @Override - public void writeTo( - WritableByteChannel channel, - FileSmoosher smoosher - ) throws IOException + public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { - Preconditions.checkState(closedForWrite, "Not closed yet!"); - if (writeDictionary) { - Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?"); - } - - // write out compressed dictionaryId int column, bitmap indexes, and array element bitmap indexes - // by iterating intermediate value column the intermediate value column should be replaced someday by a cooler - // compressed int column writer that allows easy iteration of the values it writes out, so that we could just - // build the bitmap indexes here instead of doing both things - String filenameBase = StringUtils.format("%s.forward_dim", name); - final int cardinality = dictionaryIdLookup.getStringCardinality() - + dictionaryIdLookup.getLongCardinality() - + dictionaryIdLookup.getDoubleCardinality() - + dictionaryIdLookup.getArrayCardinality(); - final CompressionStrategy compression = indexSpec.getDimensionCompression(); - final CompressionStrategy compressionToUse; - if (compression != CompressionStrategy.UNCOMPRESSED && compression != CompressionStrategy.NONE) { - compressionToUse = compression; - } else { - compressionToUse = CompressionStrategy.LZ4; - } - - final SingleValueColumnarIntsSerializer encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create( - name, - segmentWriteOutMedium, - filenameBase, - cardinality, - compressionToUse, - segmentWriteOutMedium.getCloser() - ); - encodedValueSerializer.open(); + closeForWrite(); + internalSerializer.writeTo(channel, smoosher); + } - final GenericIndexedWriter bitmapIndexWriter = new GenericIndexedWriter<>( - segmentWriteOutMedium, - name, - indexSpec.getBitmapSerdeFactory().getObjectStrategy() - ); - bitmapIndexWriter.open(); - bitmapIndexWriter.setObjectsNotSorted(); - final MutableBitmap[] bitmaps = new MutableBitmap[cardinality]; - final Int2ObjectRBTreeMap arrayElements = new Int2ObjectRBTreeMap<>(); - for (int i = 0; i < bitmaps.length; i++) { - bitmaps[i] = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap(); - } - final GenericIndexedWriter arrayElementIndexWriter = new GenericIndexedWriter<>( - segmentWriteOutMedium, - name + "_arrays", - indexSpec.getBitmapSerdeFactory().getObjectStrategy() - ); - arrayElementIndexWriter.open(); - arrayElementIndexWriter.setObjectsNotSorted(); - - final IntIterator rows = intermediateValueWriter.getIterator(); - int rowCount = 0; - final int arrayBaseId = dictionaryIdLookup.getStringCardinality() - + dictionaryIdLookup.getLongCardinality() - + dictionaryIdLookup.getDoubleCardinality(); - while (rows.hasNext()) { - final int dictId = rows.nextInt(); - encodedValueSerializer.addValue(dictId); - bitmaps[dictId].add(rowCount); - if (dictId >= arrayBaseId) { - int[] array = dictionaryIdLookup.getArrayValue(dictId); - for (int elementId : array) { - arrayElements.computeIfAbsent( - elementId, - (id) -> indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap() - ).add(rowCount); + /** + * Internal serializer used to serailize a {@link VariantColumn}. Contains the logic to write out the column to a + * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it is closed for writes. + */ + public static class InternalSerializer implements Serializer + { + private final String columnName; + private final ByteBuffer columnNameBytes; + private final Byte variantTypeSetByte; + + private final DictionaryWriter dictionaryWriter; + private final FixedIndexedWriter longDictionaryWriter; + private final FixedIndexedWriter doubleDictionaryWriter; + private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter; + + private final SingleValueColumnarIntsSerializer encodedValueSerializer; + private final GenericIndexedWriter bitmapIndexWriter; + + private final FixedIndexedIntWriter arrayElementDictionaryWriter; + private final GenericIndexedWriter arrayElementIndexWriter; + private final boolean writeDictionary; + + private final DictionaryIdLookup dictionaryIdLookup; + + public InternalSerializer( + String columnName, + Byte variantTypeSetByte, + DictionaryWriter dictionaryWriter, + FixedIndexedWriter longDictionaryWriter, + FixedIndexedWriter doubleDictionaryWriter, + FrontCodedIntArrayIndexedWriter arrayDictionaryWriter, + SingleValueColumnarIntsSerializer encodedValueSerializer, + GenericIndexedWriter bitmapIndexWriter, + FixedIndexedIntWriter arrayElementDictionaryWriter, + GenericIndexedWriter arrayElementIndexWriter, + DictionaryIdLookup dictionaryIdLookup, + boolean writeDictionary + ) + { + this.columnName = columnName; + this.columnNameBytes = ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName); + this.variantTypeSetByte = variantTypeSetByte; + this.dictionaryWriter = dictionaryWriter; + this.longDictionaryWriter = longDictionaryWriter; + this.doubleDictionaryWriter = doubleDictionaryWriter; + this.arrayDictionaryWriter = arrayDictionaryWriter; + this.encodedValueSerializer = encodedValueSerializer; + this.bitmapIndexWriter = bitmapIndexWriter; + this.arrayElementDictionaryWriter = arrayElementDictionaryWriter; + this.arrayElementIndexWriter = arrayElementIndexWriter; + this.writeDictionary = writeDictionary; + this.dictionaryIdLookup = dictionaryIdLookup; + + boolean[] dictionariesSorted = new boolean[]{ + dictionaryWriter.isSorted(), + longDictionaryWriter.isSorted(), + doubleDictionaryWriter.isSorted(), + arrayDictionaryWriter.isSorted() + }; + for (boolean sorted : dictionariesSorted) { + if (writeDictionary && !sorted) { + throw DruidException.defensive( + "Dictionary is not sorted? [%s] Should always be sorted", + Arrays.toString(dictionariesSorted) + ); } } - rowCount++; } - for (int i = 0; i < bitmaps.length; i++) { - final MutableBitmap bitmap = bitmaps[i]; - bitmapIndexWriter.write( - indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap) - ); - bitmaps[i] = null; // Reclaim memory - } - if (writeDictionary) { - for (Int2ObjectMap.Entry arrayElement : arrayElements.int2ObjectEntrySet()) { - arrayElementDictionaryWriter.write(arrayElement.getIntKey()); - arrayElementIndexWriter.write( - indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue()) - ); + @Override + public long getSerializedSize() + { + long size = 1 + columnNameBytes.capacity(); + // the value dictionaries, indexes, array element indexes and dictionary id columns are all stored in separate files + if (variantTypeSetByte != null) { + size += 1; } + return size; } - writeV0Header(channel, columnNameBytes); - if (variantTypeSetByte != null) { - channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte})); - } - - if (writeDictionary) { - if (dictionaryIdLookup.getStringBufferMapper() != null) { - copyFromTempSmoosh(smoosher, dictionaryIdLookup.getStringBufferMapper()); - } else { - writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME); - } - if (dictionaryIdLookup.getLongBufferMapper() != null) { - copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper()); - } else { - writeInternal(smoosher, longDictionaryWriter, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME); - } - if (dictionaryIdLookup.getDoubleBufferMapper() != null) { - copyFromTempSmoosh(smoosher, dictionaryIdLookup.getDoubleBufferMapper()); - } else { - writeInternal(smoosher, doubleDictionaryWriter, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME); + @Override + public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException + { + writeV0Header(channel, columnNameBytes); + if (variantTypeSetByte != null) { + channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte})); } - if (dictionaryIdLookup.getArrayBufferMapper() != null) { - copyFromTempSmoosh(smoosher, dictionaryIdLookup.getArrayBufferMapper()); - } else { - writeInternal(smoosher, arrayDictionaryWriter, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME); + + if (writeDictionary) { + if (dictionaryIdLookup.getStringBufferMapper() != null) { + copyFromTempSmoosh(smoosher, dictionaryIdLookup.getStringBufferMapper()); + } else { + ColumnSerializerUtils.writeInternal(smoosher, dictionaryWriter, columnName, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME); + } + + if (dictionaryIdLookup.getLongBufferMapper() != null) { + copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper()); + } else { + ColumnSerializerUtils.writeInternal(smoosher, longDictionaryWriter, columnName, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME); + } + + if (dictionaryIdLookup.getDoubleBufferMapper() != null) { + copyFromTempSmoosh(smoosher, dictionaryIdLookup.getDoubleBufferMapper()); + } else { + ColumnSerializerUtils.writeInternal(smoosher, doubleDictionaryWriter, columnName, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME); + } + if (dictionaryIdLookup.getArrayBufferMapper() != null) { + copyFromTempSmoosh(smoosher, dictionaryIdLookup.getArrayBufferMapper()); + } else { + ColumnSerializerUtils.writeInternal(smoosher, arrayDictionaryWriter, columnName, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME); + } + + ColumnSerializerUtils.writeInternal(smoosher, arrayElementDictionaryWriter, columnName, ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME); } + ColumnSerializerUtils.writeInternal(smoosher, encodedValueSerializer, columnName, ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME); + ColumnSerializerUtils.writeInternal(smoosher, bitmapIndexWriter, columnName, ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME); + ColumnSerializerUtils.writeInternal(smoosher, arrayElementIndexWriter, columnName, ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME); - writeInternal(smoosher, arrayElementDictionaryWriter, ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME); + log.info("Column [%s] serialized successfully.", columnName); } - writeInternal(smoosher, encodedValueSerializer, ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME); - writeInternal(smoosher, bitmapIndexWriter, ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME); - writeInternal(smoosher, arrayElementIndexWriter, ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME); - - log.info("Column [%s] serialized successfully.", name); } } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java index cf9b7c70d563..02e7f5b4d397 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java @@ -344,7 +344,8 @@ public void read( final StringUtf8DictionaryEncodedColumnSupplier supplier = new StringUtf8DictionaryEncodedColumnSupplier<>( dictionarySupplier, rSingleValuedColumn, - rMultiValuedColumn + rMultiValuedColumn, + bitmapSerdeFactory.getBitmapFactory() ); builder.setHasMultipleValues(hasMultipleValues) .setHasNulls(hasNulls) diff --git a/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java b/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java index 33de1869e273..6d443ebf11d9 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.serde; import com.google.common.base.Supplier; +import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.segment.column.DictionaryEncodedColumn; import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn; @@ -38,16 +39,19 @@ public class StringUtf8DictionaryEncodedColumnSupplier utf8Dictionary; private final @Nullable Supplier singleValuedColumn; private final @Nullable Supplier multiValuedColumn; + private final BitmapFactory bitmapFactory; public StringUtf8DictionaryEncodedColumnSupplier( Supplier utf8Dictionary, @Nullable Supplier singleValuedColumn, - @Nullable Supplier multiValuedColumn + @Nullable Supplier multiValuedColumn, + BitmapFactory bitmapFactory ) { this.utf8Dictionary = utf8Dictionary; this.singleValuedColumn = singleValuedColumn; this.multiValuedColumn = multiValuedColumn; + this.bitmapFactory = bitmapFactory; } public Supplier getDictionary() @@ -64,19 +68,22 @@ public DictionaryEncodedColumn get() return new StringUtf8DictionaryEncodedColumn( singleValuedColumn != null ? new CombineFirstTwoValuesColumnarInts(singleValuedColumn.get()) : null, multiValuedColumn != null ? new CombineFirstTwoValuesColumnarMultiInts(multiValuedColumn.get()) : null, - CombineFirstTwoEntriesIndexed.returnNull(suppliedUtf8Dictionary) + CombineFirstTwoEntriesIndexed.returnNull(suppliedUtf8Dictionary), + bitmapFactory ); } else if (NullHandling.mustReplaceFirstValueWithNullInDictionary(suppliedUtf8Dictionary)) { return new StringUtf8DictionaryEncodedColumn( singleValuedColumn != null ? singleValuedColumn.get() : null, multiValuedColumn != null ? multiValuedColumn.get() : null, - new ReplaceFirstValueWithNullIndexed<>(suppliedUtf8Dictionary) + new ReplaceFirstValueWithNullIndexed<>(suppliedUtf8Dictionary), + bitmapFactory ); } else { return new StringUtf8DictionaryEncodedColumn( singleValuedColumn != null ? singleValuedColumn.get() : null, multiValuedColumn != null ? multiValuedColumn.get() : null, - suppliedUtf8Dictionary + suppliedUtf8Dictionary, + bitmapFactory ); } } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java index 3931601dd4f3..0825e0e46bd6 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java @@ -30,10 +30,10 @@ */ public interface IOIterator extends Closeable { - boolean hasNext() throws IOException; + boolean hasNext(); T next() throws IOException; @Override - void close() throws IOException; + void close(); } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java index ffbf00a9c409..476795b9f5e0 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java @@ -64,6 +64,41 @@ */ public interface StagedSerde { + static StagedSerde forBytes() + { + return new StagedSerde() + { + @Override + public byte[] serialize(byte[] value) + { + return value; + } + + @Override + public byte[] deserialize(byte[] bytes) + { + return bytes; + } + + @Override + public StorableBuffer serializeDelayed(@Nullable byte[] value) + { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public byte[] deserialize(ByteBuffer byteBuffer) + { + byte[] retVal = new byte[byteBuffer.remaining()]; + int position = byteBuffer.position(); + byteBuffer.get(retVal); + byteBuffer.position(position); + return retVal; + } + }; + } + /** * Useful method when some computation is necessary to prepare for serialization without actually writing out * all the bytes in order to determine the serialized size. It allows encapsulation of the size computation and diff --git a/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java index ff4dc11f220f..0a79b97f523e 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.filter; import com.google.common.collect.ImmutableList; +import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.filter.SelectorPredicateFactory; @@ -76,7 +77,8 @@ public void testDimensionProcessorMultiValuedDimensionMatchingValue() GenericIndexed.UTF8_STRATEGY )::singleThreaded, null, - () -> VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new int[]{1}))) + () -> VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new int[]{1}))), + new RoaringBitmapFactory() ); final ValueMatcher matcher = forSelector("v2") .makeDimensionProcessor(columnSupplier.get().makeDimensionSelector(new SimpleAscendingOffset(1), null), true); @@ -97,7 +99,8 @@ public void testDimensionProcessorMultiValuedDimensionNotMatchingValue() GenericIndexed.UTF8_STRATEGY )::singleThreaded, null, - () -> VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new int[]{1}))) + () -> VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new int[]{1}))), + new RoaringBitmapFactory() ); final ValueMatcher matcher = forSelector("v3") .makeDimensionProcessor(columnSupplier.get().makeDimensionSelector(new SimpleAscendingOffset(1), null), true); diff --git a/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java b/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java index 86e4e0ee08b3..bdb15ee445a1 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.filter; import com.google.common.collect.ImmutableList; +import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.filter.DruidObjectPredicate; import org.apache.druid.query.filter.DruidPredicateMatch; @@ -47,13 +48,15 @@ public class ValueMatchersTest extends InitializedNullHandlingTest @Before public void setup() { + final RoaringBitmapFactory bitmapFactory = new RoaringBitmapFactory(); supplierSingleConstant = new StringUtf8DictionaryEncodedColumnSupplier<>( GenericIndexed.fromIterable( ImmutableList.of(ByteBuffer.wrap(StringUtils.toUtf8("value"))), GenericIndexed.UTF8_STRATEGY )::singleThreaded, () -> VSizeColumnarInts.fromArray(new int[]{0}), - null + null, + bitmapFactory ); supplierSingle = new StringUtf8DictionaryEncodedColumnSupplier<>( GenericIndexed.fromIterable( @@ -64,7 +67,8 @@ public void setup() GenericIndexed.UTF8_STRATEGY )::singleThreaded, () -> VSizeColumnarInts.fromArray(new int[]{0, 0, 1, 0, 1}), - null + null, + bitmapFactory ); supplierMulti = new StringUtf8DictionaryEncodedColumnSupplier<>( GenericIndexed.fromIterable( @@ -77,7 +81,8 @@ public void setup() VSizeColumnarInts.fromArray(new int[]{0, 0}), VSizeColumnarInts.fromArray(new int[]{0}) ) - ) + ), + bitmapFactory ); } @Test