diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java index cc102113a8a3..4ed84a2c0c0f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java @@ -19,6 +19,8 @@ package org.apache.pinot.segment.local.io.util; import java.io.Closeable; +import java.util.List; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -216,6 +218,28 @@ public void unsetBit(int bitOffset) { _dataBuffer.putByte(byteOffset, (byte) (_dataBuffer.getByte(byteOffset) & (0xFF7F >>> bitOffsetInByte))); } + public int getNextSetBitOffsetRecordRanges(int bitOffset, long baseOffset, + List ranges) { + long startOffset = baseOffset + (bitOffset / Byte.SIZE); + int size = Byte.BYTES; + int byteOffset = bitOffset / Byte.SIZE; + int bitOffsetInFirstByte = bitOffset % Byte.SIZE; + int firstByte = (_dataBuffer.getByte(byteOffset) << bitOffsetInFirstByte) & BYTE_MASK; + if (firstByte != 0) { + ranges.add(new ForwardIndexReader.ByteRange(startOffset, size)); + return bitOffset + FIRST_BIT_SET[firstByte]; + } + while (true) { + byteOffset++; + size += Byte.SIZE; + int currentByte = _dataBuffer.getByte(byteOffset) & BYTE_MASK; + if (currentByte != 0) { + ranges.add(new ForwardIndexReader.ByteRange(startOffset, size)); + return (byteOffset * Byte.SIZE) | FIRST_BIT_SET[currentByte]; + } + } + } + public int getNextSetBitOffset(int bitOffset) { int byteOffset = bitOffset / Byte.SIZE; int bitOffsetInFirstByte = bitOffset % Byte.SIZE; @@ -232,6 +256,32 @@ public int getNextSetBitOffset(int bitOffset) { } } + public int getNextNthSetBitOffsetAndRecordRanges(int bitOffset, int n, long baseOffset, + List ranges) { + long startOffset = baseOffset + (bitOffset / Byte.SIZE); + int size = Byte.SIZE; + int byteOffset = bitOffset / Byte.SIZE; + int bitOffsetInFirstByte = bitOffset % Byte.SIZE; + ranges.add(new ForwardIndexReader.ByteRange(baseOffset + byteOffset, Byte.BYTES)); + int firstByte = (_dataBuffer.getByte(byteOffset) << bitOffsetInFirstByte) & BYTE_MASK; + int numBitsSet = NUM_BITS_SET[firstByte]; + if (numBitsSet >= n) { + ranges.add(new ForwardIndexReader.ByteRange(startOffset, size)); + return bitOffset + NTH_BIT_SET[n - 1][firstByte]; + } + while (true) { + n -= numBitsSet; + byteOffset++; + size += Byte.SIZE; + int currentByte = _dataBuffer.getByte(byteOffset) & BYTE_MASK; + numBitsSet = NUM_BITS_SET[currentByte]; + if (numBitsSet >= n) { + ranges.add(new ForwardIndexReader.ByteRange(startOffset, size)); + return (byteOffset * Byte.SIZE) | NTH_BIT_SET[n - 1][currentByte]; + } + } + } + public int getNextNthSetBitOffset(int bitOffset, int n) { int byteOffset = bitOffset / Byte.SIZE; int bitOffsetInFirstByte = bitOffset % Byte.SIZE; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java index 6de6e1294bdb..8408a9026da7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java @@ -41,7 +41,15 @@ public class ForwardIndexReaderFactory extends IndexReaderFactory.Default { - public static final ForwardIndexReaderFactory INSTANCE = new ForwardIndexReaderFactory(); + private static volatile ForwardIndexReaderFactory _instance = new ForwardIndexReaderFactory(); + + public static void setInstance(ForwardIndexReaderFactory factory) { + _instance = factory; + } + + public static ForwardIndexReaderFactory getInstance() { + return _instance; + } @Override protected IndexType getIndexType() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java index a32eff102b0a..d81a23362589 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java @@ -215,7 +215,7 @@ public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map createReaderFactory() { - return ForwardIndexReaderFactory.INSTANCE; + return ForwardIndexReaderFactory.getInstance(); } public String getFileExtension(ColumnMetadata columnMetadata) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java index 5e05346bbc2d..c7855ee54fd2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java @@ -25,6 +25,8 @@ import java.nio.FloatBuffer; import java.nio.IntBuffer; import java.nio.LongBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkDecompressor; @@ -53,6 +55,8 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader< protected final int _headerEntryChunkOffsetSize; protected final PinotDataBuffer _rawData; protected final boolean _isSingleValue; + protected final int _dataHeaderStart; + protected final int _rawDataStart; protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType storedType, boolean isSingleValue) { _dataBuffer = dataBuffer; @@ -96,9 +100,11 @@ protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType store // Slice out the header from the data buffer. int dataHeaderLength = _numChunks * _headerEntryChunkOffsetSize; int rawDataStart = dataHeaderStart + dataHeaderLength; + _dataHeaderStart = dataHeaderStart; _dataHeader = _dataBuffer.view(dataHeaderStart, rawDataStart); // Useful for uncompressed data. + _rawDataStart = rawDataStart; _rawData = _dataBuffer.view(rawDataStart, _dataBuffer.size()); _isSingleValue = isSingleValue; @@ -115,13 +121,86 @@ protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType store * @return Chunk for the row */ protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) { - int chunkId = docId / _numDocsPerChunk; + int chunkId = getChunkId(docId); if (context.getChunkId() == chunkId) { return context.getChunkBuffer(); } return decompressChunk(chunkId, context); } + protected int getChunkId(int docId) { + return docId / _numDocsPerChunk; + } + + protected void recordDocIdRangesUncompressed(int docId, int rowOffsetSize, List ranges) { + int chunkId = getChunkId(docId); + int chunkRowId = docId % _numDocsPerChunk; + + // These offsets are offset in the data buffer + long chunkStartOffset = getChunkPositionAndRecordRanges(chunkId, ranges); + ranges.add(new ByteRange(chunkStartOffset + (long) chunkRowId * rowOffsetSize, Integer.BYTES)); + long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) chunkRowId * rowOffsetSize); + long valueEndOffset = + getValueEndOffsetAndRecordRanges(chunkId, chunkRowId, rowOffsetSize, chunkStartOffset, ranges); + + ranges.add(new ByteRange(valueStartOffset, (int) (valueEndOffset - valueStartOffset))); + } + + protected long getValueEndOffsetAndRecordRanges(int chunkId, int chunkRowId, int rowOffsetSize, long chunkStartOffset, + List ranges) { + if (chunkId == _numChunks - 1) { + // Last chunk + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the last chunk + return _dataBuffer.size(); + } else { + ranges.add(new ByteRange(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize, Integer.BYTES)); + int valueEndOffsetInChunk = _dataBuffer.getInt(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize); + if (valueEndOffsetInChunk == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return _dataBuffer.size(); + } else { + return chunkStartOffset + valueEndOffsetInChunk; + } + } + } else { + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the chunk + return getChunkPositionAndRecordRanges(chunkId + 1, ranges); + } else { + ranges.add(new ByteRange(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize, Integer.BYTES)); + return chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize); + } + } + } + + protected void recordDocIdRanges(int docId, ChunkReaderContext context, List ranges) { + int chunkId = getChunkId(docId); + if (context.getChunkId() == chunkId) { + ranges.addAll(context.getRanges()); + return; + } + recordChunkRanges(chunkId, context, ranges); + } + + protected void recordChunkRanges(int chunkId, ChunkReaderContext context, List ranges) { + List chunkRanges = new ArrayList<>(); + int chunkSize; + long chunkPosition = getChunkPositionAndRecordRanges(chunkId, chunkRanges); + + // Size of chunk can be determined using next chunks offset, or end of data buffer for last chunk. + if (chunkId == (_numChunks - 1)) { // Last chunk. + chunkSize = (int) (_dataBuffer.size() - chunkPosition); + } else { + long nextChunkOffset = getChunkPositionAndRecordRanges(chunkId + 1, chunkRanges); + chunkSize = (int) (nextChunkOffset - chunkPosition); + } + chunkRanges.add(new ByteRange(chunkPosition, chunkSize)); + context.setChunkId(chunkId); + context.setRanges(chunkRanges); + ranges.addAll(chunkRanges); + } + protected ByteBuffer decompressChunk(int chunkId, ChunkReaderContext context) { int chunkSize; long chunkPosition = getChunkPosition(chunkId); @@ -160,6 +239,18 @@ protected long getChunkPosition(int chunkId) { } } + protected long getChunkPositionAndRecordRanges(int chunkId, List ranges) { + if (_headerEntryChunkOffsetSize == Integer.BYTES) { + ranges.add( + new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Integer.BYTES)); + return _dataHeader.getInt(chunkId * _headerEntryChunkOffsetSize); + } else { + ranges.add( + new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Long.BYTES)); + return _dataHeader.getLong(chunkId * _headerEntryChunkOffsetSize); + } + } + @Override public boolean isDictionaryEncoded() { return false; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java index 1adf68558486..86a3bee79c3f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java @@ -20,6 +20,11 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; +import lombok.Setter; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.memory.CleanerUtil; @@ -36,24 +41,21 @@ * */ public class ChunkReaderContext implements ForwardIndexReaderContext { + @Getter private final ByteBuffer _chunkBuffer; + + @Getter + @Setter private int _chunkId; + @Getter + @Setter + private List _ranges; + public ChunkReaderContext(int maxChunkSize) { _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize); _chunkId = -1; - } - - public ByteBuffer getChunkBuffer() { - return _chunkBuffer; - } - - public int getChunkId() { - return _chunkId; - } - - public void setChunkId(int chunkId) { - _chunkId = chunkId; + _ranges = new ArrayList<>(); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java index 7a6f153768e1..983de9c6b991 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.segment.index.readers.forward; +import java.util.List; import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter; import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter; import org.apache.pinot.segment.local.io.util.PinotDataBitSet; @@ -58,16 +59,23 @@ public final class FixedBitMVForwardIndexReader implements ForwardIndexReader ranges) { + int contextDocId = context._docId; + int contextEndOffset = context._endOffset; + int startIndex; + if (docId == contextDocId + 1) { + startIndex = contextEndOffset; + } else { + int chunkId = docId / _numDocsPerChunk; + if (docId > contextDocId && chunkId == contextDocId / _numDocsPerChunk) { + // Same chunk + startIndex = + _bitmapReader.getNextNthSetBitOffsetAndRecordRanges(contextEndOffset + 1, docId - contextDocId - 1, + _bitmapReaderStartOffset, ranges); + } else { + // Different chunk + ranges.add(new ByteRange(chunkId, Integer.BYTES)); + int chunkOffset = _chunkOffsetReader.getInt(chunkId); + int indexInChunk = docId % _numDocsPerChunk; + if (indexInChunk == 0) { + startIndex = chunkOffset; + } else { + startIndex = _bitmapReader.getNextNthSetBitOffsetAndRecordRanges(chunkOffset + 1, indexInChunk, + _bitmapReaderStartOffset, ranges); + } + } + } + int endIndex; + if (docId == _numDocs - 1) { + endIndex = _numValues; + } else { + endIndex = _bitmapReader.getNextSetBitOffsetRecordRanges(startIndex + 1, _bitmapReaderStartOffset, ranges); + } + int numValues = endIndex - startIndex; + long startBitOffset = (long) startIndex * _numBitsPerValue; + long byteStartOffset = (startBitOffset / Byte.SIZE); + int size = (int) (((long) numValues * _numBitsPerValue + Byte.SIZE - 1) / Byte.SIZE); + + ranges.add(new ByteRange(_rawDataReaderStartOffset + byteStartOffset, size)); + + // Update context + context._docId = docId; + context._endOffset = endIndex; + } + + @Override + public boolean isFixedOffsetMappingType() { + return false; + } + + @Override + public long getRawDataStartOffset() { + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } + + @Override + public int getDocLength() { + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } + public static class Context implements ForwardIndexReaderContext { private int _docId = -1; // Exclusive diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReader.java index 8f1b1c432a15..3adbedb00349 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReader.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.segment.index.readers.forward; +import java.util.List; import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; @@ -31,9 +32,11 @@ */ public final class FixedBitSVForwardIndexReader implements ForwardIndexReader { private final FixedBitIntReaderWriter _reader; + private final int _numBitsPerValue; public FixedBitSVForwardIndexReader(PinotDataBuffer dataBuffer, int numDocs, int numBitsPerValue) { _reader = new FixedBitIntReaderWriter(dataBuffer, numDocs, numBitsPerValue); + _numBitsPerValue = numBitsPerValue; } @Override @@ -67,4 +70,34 @@ public void readDictIds(int[] docIds, int length, int[] dictIdBuffer, ForwardInd public void close() { _reader.close(); } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ForwardIndexReaderContext context, List ranges) { + throw new UnsupportedOperationException("Forward index is fixed length type"); + } + + @Override + public boolean isFixedOffsetMappingType() { + return true; + } + + @Override + public long getRawDataStartOffset() { + return 0; + } + + @Override + public int getDocLength() { + return _numBitsPerValue; + } + + @Override + public boolean isDocLengthInBits() { + return true; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java index 203e01a75c8d..191b1b62c401 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.segment.index.readers.forward; +import java.util.List; import org.apache.pinot.segment.local.io.reader.impl.FixedBitIntReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; @@ -32,10 +33,12 @@ public final class FixedBitSVForwardIndexReaderV2 implements ForwardIndexReader { private final FixedBitIntReader _reader; private final int _numDocs; + private final int _numBitsPerValue; public FixedBitSVForwardIndexReaderV2(PinotDataBuffer dataBuffer, int numDocs, int numBitsPerValue) { _reader = FixedBitIntReader.getReader(dataBuffer, numBitsPerValue); _numDocs = numDocs; + _numBitsPerValue = numBitsPerValue; } @Override @@ -98,4 +101,34 @@ public void readDictIds(int[] docIds, int length, int[] dictIdBuffer, ForwardInd @Override public void close() { } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ForwardIndexReaderContext context, List ranges) { + throw new UnsupportedOperationException("Forward index is fixed length type"); + } + + @Override + public boolean isFixedOffsetMappingType() { + return true; + } + + @Override + public long getRawDataStartOffset() { + return 0; + } + + @Override + public int getDocLength() { + return _numBitsPerValue; + } + + @Override + public boolean isDocLengthInBits() { + return true; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java index cc44bb341b06..f96ed6e878a6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.segment.index.readers.forward; import java.nio.ByteBuffer; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -222,4 +223,33 @@ private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffse } } } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ChunkReaderContext context, List ranges) { + if (_isCompressed) { + recordDocIdRanges(docId, context, ranges); + } else { + recordDocIdRangesUncompressed(docId, ROW_OFFSET_SIZE, ranges); + } + } + + @Override + public boolean isFixedOffsetMappingType() { + return false; + } + + @Override + public long getRawDataStartOffset() { + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } + + @Override + public int getDocLength() { + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java index 4bda3f809034..1021ec1af25e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.segment.index.readers.forward; import java.nio.ByteBuffer; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -91,4 +92,39 @@ public double getDouble(int docId, ChunkReaderContext context) { return _rawData.getDouble(docId * Double.BYTES); } } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ChunkReaderContext context, @Nullable List ranges) { + if (!_isCompressed) { + // If uncompressed, should use fixed offset + throw new UnsupportedOperationException("Forward index is fixed length type"); + } + recordDocIdRanges(docId, context, ranges); + } + + @Override + public boolean isFixedOffsetMappingType() { + return !_isCompressed; + } + + @Override + public long getRawDataStartOffset() { + if (isFixedOffsetMappingType()) { + return _rawDataStart; + } + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } + + @Override + public int getDocLength() { + if (isFixedOffsetMappingType()) { + return _storedType.size(); + } + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java index 1fc567cfd47e..1e5b5be4dc7d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.segment.index.readers.forward; import java.nio.ByteBuffer; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -94,21 +95,45 @@ public double getDouble(int docId, ChunkReaderContext context) { } } - /** - * Helper method to return the chunk buffer that contains the value at the given document id. - *
    - *
  • If the chunk already exists in the reader context, returns the same.
  • - *
  • Otherwise, loads the chunk for the row, and sets it in the reader context.
  • - *
- * @param docId Document id - * @param context Reader context - * @return Chunk for the row - */ - protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) { - int chunkId = docId >>> _shift; - if (context.getChunkId() == chunkId) { - return context.getChunkBuffer(); + protected int getChunkId(int docId) { + return docId >>> _shift; + } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ChunkReaderContext context, List ranges) { + if (_isCompressed) { + recordDocIdRanges(docId, context, ranges); + } else { + // If uncompressed, should use fixed offset + throw new UnsupportedOperationException("Forward index is of fixed length type"); + } + } + + @Override + public boolean isFixedOffsetMappingType() { + return !_isCompressed; + } + + @Override + public long getRawDataStartOffset() { + if (isFixedOffsetMappingType()) { + return _rawDataStart; + } else { + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } + } + + @Override + public int getDocLength() { + if (isFixedOffsetMappingType()) { + return _storedType.size(); + } else { + throw new UnsupportedOperationException("Forward index is not fixed length type"); } - return decompressChunk(chunkId, context); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java index 4858e790cb21..47c30aec6b1a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java @@ -24,6 +24,8 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; @@ -56,6 +58,7 @@ public class VarByteChunkForwardIndexReaderV4 private final PinotDataBuffer _metadata; private final PinotDataBuffer _chunks; private final boolean _isSingleValue; + private final long _chunksStartOffset; public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.DataType storedType, boolean isSingleValue) { @@ -68,6 +71,7 @@ public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.Da int chunksOffset = dataBuffer.getInt(12); // the file has a BE header for compatability reasons (version selection) but the content is LE _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN); + _chunksStartOffset = chunksOffset; _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), ByteOrder.LITTLE_ENDIAN); _isSingleValue = isSingleValue; } @@ -97,8 +101,9 @@ public ChunkCompressionType getCompressionType() { @Override public ReaderContext createContext() { return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH ? new UncompressedReaderContext(_chunks, - _metadata) : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor, _chunkCompressionType, - _targetDecompressedChunkSize); + _metadata, _chunksStartOffset) + : new CompressedReaderContext(_metadata, _chunks, _chunksStartOffset, _chunkDecompressor, _chunkCompressionType, + _targetDecompressedChunkSize); } @Override @@ -263,6 +268,31 @@ public void close() throws IOException { } + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ReaderContext context, List ranges) { + context.recordRangesForDocId(docId, ranges); + } + + @Override + public boolean isFixedOffsetMappingType() { + return false; + } + + @Override + public long getRawDataStartOffset() { + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } + + @Override + public int getDocLength() { + throw new UnsupportedOperationException("Forward index is not fixed length type"); + } + public static abstract class ReaderContext implements ForwardIndexReaderContext { protected final PinotDataBuffer _chunks; @@ -271,10 +301,21 @@ public static abstract class ReaderContext implements ForwardIndexReaderContext protected int _nextDocIdOffset; protected boolean _regularChunk; protected int _numDocsInCurrentChunk; + protected long _chunkStartOffset; + private List _ranges; - protected ReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) { + protected ReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, long chunkStartOffset) { _chunks = chunks; _metadata = metadata; + _chunkStartOffset = chunkStartOffset; + } + + private void recordRangesForDocId(int docId, List ranges) { + if (docId >= _docIdOffset && docId < _nextDocIdOffset) { + ranges.addAll(_ranges); + } else { + initAndRecordRangesForDocId(docId, ranges); + } } public byte[] getValue(int docId) { @@ -330,14 +371,36 @@ private byte[] decompressAndRead(int docId) } return processChunkAndReadFirstValue(docId, offset, limit); } + + private void initAndRecordRangesForDocId(int docId, List ranges) { + // Due to binary search on metadata buffer, it's simple to record the entire metadata buffer byte ranges + _ranges = new ArrayList<>(); + _ranges.add(new ByteRange(0, (int) _metadata.size())); + long metadataEntry = chunkIndexFor(docId); + int info = _metadata.getInt(metadataEntry); + _docIdOffset = info & 0x7FFFFFFF; + _regularChunk = _docIdOffset == info; + long offset = _metadata.getInt(metadataEntry + Integer.BYTES) & 0xFFFFFFFFL; + long limit; + if (_metadata.size() - METADATA_ENTRY_SIZE > metadataEntry) { + _nextDocIdOffset = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE) & 0x7FFFFFFF; + limit = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE + Integer.BYTES) & 0xFFFFFFFFL; + } else { + _nextDocIdOffset = Integer.MAX_VALUE; + limit = _chunks.size(); + } + _ranges.add(new ByteRange(_chunkStartOffset + offset, (int) (limit - offset))); + ranges.addAll(_ranges); + } } private static final class UncompressedReaderContext extends ReaderContext { private ByteBuffer _chunk; + private List _ranges; - UncompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) { - super(chunks, metadata); + UncompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, long chunkStartOffset) { + super(chunks, metadata, chunkStartOffset); } @Override @@ -380,9 +443,9 @@ private static final class CompressedReaderContext extends ReaderContext { private final ChunkDecompressor _chunkDecompressor; private final ChunkCompressionType _chunkCompressionType; - CompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, ChunkDecompressor chunkDecompressor, - ChunkCompressionType chunkCompressionType, int targetChunkSize) { - super(metadata, chunks); + CompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, long chunkStartOffset, + ChunkDecompressor chunkDecompressor, ChunkCompressionType chunkCompressionType, int targetChunkSize) { + super(metadata, chunks, chunkStartOffset); _chunkDecompressor = chunkDecompressor; _chunkCompressionType = chunkCompressionType; _decompressedBuffer = ByteBuffer.allocateDirect(targetChunkSize).order(ByteOrder.LITTLE_ENDIAN); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java index 235132d44ca7..1d133bb896c3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -242,4 +243,33 @@ private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffse } } } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ChunkReaderContext context, List ranges) { + if (_isCompressed) { + recordDocIdRanges(docId, context, ranges); + } else { + recordDocIdRangesUncompressed(docId, ROW_OFFSET_SIZE, ranges); + } + } + + @Override + public boolean isFixedOffsetMappingType() { + return false; + } + + @Override + public long getRawDataStartOffset() { + throw new UnsupportedOperationException("Forward index is not of fixed length type"); + } + + @Override + public int getDocLength() { + throw new UnsupportedOperationException("Forward index is not of fixed length type"); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java index 8728dd53a8cd..9c9f9ce5f102 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -195,4 +196,33 @@ private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffse } } } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ChunkReaderContext context, List ranges) { + if (_isCompressed) { + recordDocIdRanges(docId, context, ranges); + } else { + recordDocIdRangesUncompressed(docId, ROW_OFFSET_SIZE, ranges); + } + } + + @Override + public boolean isFixedOffsetMappingType() { + return false; + } + + @Override + public long getRawDataStartOffset() { + throw new UnsupportedOperationException("Forward index is not of fixed length type"); + } + + @Override + public int getDocLength() { + throw new UnsupportedOperationException("Forward index is not of fixed length type"); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java index a0cf8c7a97ca..6971aa13d523 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteOrder; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; @@ -171,6 +172,19 @@ public void testMV(DataType dataType, List inputs, ToIntFunction sizeo for (int i = 0; i < numDocs; i++) { Assert.assertEquals(inputs.get(i), extractor.extract(reader, context, i, valueBuffer)); } + + // Value byte range test + Assert.assertTrue(reader.isBufferByteRangeInfoSupported()); + Assert.assertFalse(reader.isFixedOffsetMappingType()); + final ForwardIndexReaderContext valueRangeContext = reader.createContext(); + List ranges = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + try { + reader.recordDocIdByteRanges(i, valueRangeContext, ranges); + } catch (Exception e) { + Assert.fail("Failed to record byte ranges for docId: " + i, e); + } + } } interface Extractor { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVForwardIndexTest.java index 4cee3cc54411..5493cb6c3dd6 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVForwardIndexTest.java @@ -20,12 +20,16 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVForwardIndexWriter; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -86,6 +90,22 @@ public void testFixedBitMVForwardIndex() } } + // Byte range test + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE); + FixedBitMVForwardIndexReader reader = new FixedBitMVForwardIndexReader(dataBuffer, NUM_DOCS, totalNumValues, + numBitsPerValue); FixedBitMVForwardIndexReader.Context readerContext = reader.createContext()) { + Assert.assertTrue(reader.isBufferByteRangeInfoSupported()); + Assert.assertFalse(reader.isFixedOffsetMappingType()); + List rangeList = new ArrayList<>(); + for (int i = 0; i < NUM_DOCS; i++) { + try { + reader.recordDocIdByteRanges(i, readerContext, rangeList); + } catch (Exception e) { + Assert.fail("Should not throw exception when calling recordDocIdByteRanges()", e); + } + } + } + FileUtils.forceDelete(INDEX_FILE); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitSVForwardIndexReaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitSVForwardIndexReaderTest.java new file mode 100644 index 000000000000..b392b41677d4 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitSVForwardIndexReaderTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.forward; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReader; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class FixedBitSVForwardIndexReaderTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "FixedBitMVForwardIndexTest"); + private static final File INDEX_FILE = + new File(TEMP_DIR, "testColumn" + V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION); + private static final int NUM_DOCS = 100; + private static final Random RANDOM = new Random(); + + @BeforeClass + public void setUp() + throws IOException { + FileUtils.forceMkdir(TEMP_DIR); + } + + @Test + public void testFixedBitMVForwardIndex() + throws Exception { + for (int numBitsPerValue = 1; numBitsPerValue <= 31; numBitsPerValue++) { + // Generate random values + int[] valuesArray = new int[NUM_DOCS]; + int totalNumValues = 0; + int maxValue = numBitsPerValue != 31 ? 1 << numBitsPerValue : Integer.MAX_VALUE; + for (int i = 0; i < NUM_DOCS; i++) { + valuesArray[i] = RANDOM.nextInt(maxValue); + } + + // Create the forward index + try (FixedBitSVForwardIndexWriter writer = new FixedBitSVForwardIndexWriter(INDEX_FILE, NUM_DOCS, + numBitsPerValue)) { + for (int value : valuesArray) { + writer.putDictId(value); + } + } + + // Read the forward index + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE); + FixedBitSVForwardIndexReader reader = new FixedBitSVForwardIndexReader(dataBuffer, NUM_DOCS, + numBitsPerValue)) { + for (int i = 0; i < NUM_DOCS; i++) { + assertEquals(valuesArray[i], reader.getDictId(i, null)); + } + } + + // Byte range test + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE); + FixedBitSVForwardIndexReader reader = new FixedBitSVForwardIndexReader(dataBuffer, NUM_DOCS, + numBitsPerValue)) { + Assert.assertTrue(reader.isBufferByteRangeInfoSupported()); + Assert.assertTrue(reader.isFixedOffsetMappingType()); + Assert.assertEquals(reader.getRawDataStartOffset(), 0); + Assert.assertEquals(reader.getDocLength(), numBitsPerValue); + Assert.assertTrue(reader.isDocLengthInBits()); + + try { + reader.recordDocIdByteRanges(0, null, new ArrayList<>()); + Assert.fail("Should have failed to record byte ranges"); + } catch (UnsupportedOperationException e) { + // expected + Assert.assertEquals(e.getMessage(), "Forward index is fixed length type"); + } + } + + FileUtils.forceDelete(INDEX_FILE); + } + } + + @AfterClass + public void tearDown() + throws IOException { + FileUtils.deleteDirectory(TEMP_DIR); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java index 43d1e35ad08c..7cfe87039871 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java @@ -102,6 +102,24 @@ public void testInt(ChunkCompressionType compressionType, int version) Assert.assertEquals(fourByteOffsetReader.getInt(i, fourByteOffsetReaderContext), expected[i]); Assert.assertEquals(eightByteOffsetReader.getInt(i, eightByteOffsetReaderContext), expected[i]); } + + + Assert.assertTrue(fourByteOffsetReader.isBufferByteRangeInfoSupported()); + Assert.assertTrue(eightByteOffsetReader.isBufferByteRangeInfoSupported()); + // Validate byte range provider behaviour + if (compressionType == ChunkCompressionType.PASS_THROUGH) { + // For pass through compression, the buffer is fixed offset mapping type + Assert.assertTrue(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(fourByteOffsetReader.getDocLength(), Integer.BYTES); + Assert.assertFalse(fourByteOffsetReader.isDocLengthInBits()); + + Assert.assertTrue(eightByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(eightByteOffsetReader.getDocLength(), Integer.BYTES); + Assert.assertFalse(eightByteOffsetReader.isDocLengthInBits()); + } else { + Assert.assertFalse(eightByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertFalse(eightByteOffsetReader.isFixedOffsetMappingType()); + } } FileUtils.deleteQuietly(outFileFourByte); @@ -150,6 +168,23 @@ public void testLong(ChunkCompressionType compressionType, int version) Assert.assertEquals(fourByteOffsetReader.getLong(i, fourByteOffsetReaderContext), expected[i]); Assert.assertEquals(eightByteOffsetReader.getLong(i, eightByteOffsetReaderContext), expected[i]); } + + // Validate byte range provider behaviour + Assert.assertTrue(fourByteOffsetReader.isBufferByteRangeInfoSupported()); + Assert.assertTrue(eightByteOffsetReader.isBufferByteRangeInfoSupported()); + if (compressionType == ChunkCompressionType.PASS_THROUGH) { + // For pass through compression, the buffer is fixed offset mapping type + Assert.assertTrue(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(fourByteOffsetReader.getDocLength(), Long.BYTES); + Assert.assertFalse(fourByteOffsetReader.isDocLengthInBits()); + + Assert.assertTrue(eightByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(eightByteOffsetReader.getDocLength(), Long.BYTES); + Assert.assertFalse(eightByteOffsetReader.isDocLengthInBits()); + } else { + Assert.assertFalse(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertFalse(eightByteOffsetReader.isFixedOffsetMappingType()); + } } FileUtils.deleteQuietly(outFileFourByte); @@ -198,6 +233,23 @@ public void testFloat(ChunkCompressionType compressionType, int version) Assert.assertEquals(fourByteOffsetReader.getFloat(i, fourByteOffsetReaderContext), expected[i]); Assert.assertEquals(eightByteOffsetReader.getFloat(i, eightByteOffsetReaderContext), expected[i]); } + + // Validate byte range provider behaviour + Assert.assertTrue(fourByteOffsetReader.isBufferByteRangeInfoSupported()); + Assert.assertTrue(eightByteOffsetReader.isBufferByteRangeInfoSupported()); + if (compressionType == ChunkCompressionType.PASS_THROUGH) { + // For pass through compression, the buffer is fixed offset mapping type + Assert.assertTrue(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(fourByteOffsetReader.getDocLength(), Float.BYTES); + Assert.assertFalse(fourByteOffsetReader.isDocLengthInBits()); + + Assert.assertTrue(eightByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(eightByteOffsetReader.getDocLength(), Float.BYTES); + Assert.assertFalse(eightByteOffsetReader.isDocLengthInBits()); + } else { + Assert.assertFalse(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertFalse(eightByteOffsetReader.isFixedOffsetMappingType()); + } } FileUtils.deleteQuietly(outFileFourByte); @@ -246,6 +298,39 @@ public void testDouble(ChunkCompressionType compressionType, int version) Assert.assertEquals(fourByteOffsetReader.getDouble(i, fourByteOffsetReaderContext), expected[i]); Assert.assertEquals(eightByteOffsetReader.getDouble(i, eightByteOffsetReaderContext), expected[i]); } + + // Validate byte range provider behaviour + Assert.assertTrue(fourByteOffsetReader.isBufferByteRangeInfoSupported()); + Assert.assertTrue(eightByteOffsetReader.isBufferByteRangeInfoSupported()); + if (compressionType == ChunkCompressionType.PASS_THROUGH) { + // For pass through compression, the buffer is fixed offset mapping type + Assert.assertTrue(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(fourByteOffsetReader.getDocLength(), Double.BYTES); + Assert.assertFalse(fourByteOffsetReader.isDocLengthInBits()); + + Assert.assertTrue(eightByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(eightByteOffsetReader.getDocLength(), Double.BYTES); + Assert.assertFalse(eightByteOffsetReader.isDocLengthInBits()); + } else { + Assert.assertFalse(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertFalse(eightByteOffsetReader.isFixedOffsetMappingType()); + } + + Assert.assertTrue(fourByteOffsetReader.isBufferByteRangeInfoSupported()); + Assert.assertTrue(eightByteOffsetReader.isBufferByteRangeInfoSupported()); + if (compressionType == ChunkCompressionType.PASS_THROUGH) { + // For pass through compression, the buffer is fixed offset mapping type + Assert.assertTrue(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(fourByteOffsetReader.getDocLength(), Double.BYTES); + Assert.assertFalse(fourByteOffsetReader.isDocLengthInBits()); + + Assert.assertTrue(eightByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertEquals(eightByteOffsetReader.getDocLength(), Double.BYTES); + Assert.assertFalse(eightByteOffsetReader.isDocLengthInBits()); + } else { + Assert.assertFalse(fourByteOffsetReader.isFixedOffsetMappingType()); + Assert.assertFalse(eightByteOffsetReader.isFixedOffsetMappingType()); + } } FileUtils.deleteQuietly(outFileFourByte); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2Test.java index 07cc843e5ecf..5deb60d90a95 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2Test.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2Test.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.util.PinotDataBitSetV2; @@ -107,6 +108,24 @@ public void testFixedBitIntReader() Assert.assertEquals(dictIdBuffer[i], values[_lastSequentialDocIds[i]]); } } + + // Byte range test + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile); + FixedBitSVForwardIndexReader reader = new FixedBitSVForwardIndexReader(dataBuffer, NUM_VALUES, + numBits)) { + Assert.assertTrue(reader.isFixedOffsetMappingType()); + Assert.assertEquals(reader.getRawDataStartOffset(), 0); + Assert.assertEquals(reader.getDocLength(), numBits); + Assert.assertTrue(reader.isDocLengthInBits()); + + try { + reader.recordDocIdByteRanges(0, null, new ArrayList<>()); + Assert.fail("Should have failed to record byte ranges"); + } catch (UnsupportedOperationException e) { + // expected + Assert.assertEquals(e.getMessage(), "Forward index is fixed length type"); + } + } } } } diff --git a/pinot-segment-spi/pom.xml b/pinot-segment-spi/pom.xml index 69d98b36d542..a43988ee4ce3 100644 --- a/pinot-segment-spi/pom.xml +++ b/pinot-segment-spi/pom.xml @@ -69,6 +69,10 @@ org.apache.calcite calcite-core + + org.projectlombok + lombok + diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java index 35f7a8111777..5882986edd1a 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java @@ -19,7 +19,11 @@ package org.apache.pinot.segment.spi.index.reader; import java.math.BigDecimal; +import java.util.List; import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.IndexReader; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -916,4 +920,77 @@ default byte[][] getBytesMV(int docId, T context) { default int getNumValuesMV(int docId, T context) { throw new UnsupportedOperationException(); } + + // Functions for recording absolute buffer byte ranges accessed while reading a given docId + + /** + * Returns whether the forward index supports recording the byte ranges accessed while reading a given docId. + * For readers that do support this info, caller should check if the buffer is a {@link isFixedOffsetMappingType()}. + * If yes, the byte range mapping for a docId can be calculated using the {@link getRawDataStartOffset()} and the + * {@link getDocLength()} functions. + * if not, caller should use the {@link recordDocIdByteRanges()} function to get the list of byte ranges accessed + * for a docId. + */ + default boolean isBufferByteRangeInfoSupported() { + return false; + } + + /** + * Returns a list of {@link ByteRange} that represents all the distinct + * buffer byte ranges (absolute offset, sizeInBytes) that are accessed when reading the given (@param docId} + * @param docId to find the range for + * @param context Reader context + * @param ranges List of {@link ByteRange} to which the applicable value ranges will be added + */ + default void recordDocIdByteRanges(int docId, T context, List ranges) { + throw new UnsupportedOperationException(); + } + + /** + * Returns whether the forward index is of fixed length type, and therefore the docId -> byte range mapping is fixed + * @return true if forward index has a fixed mapping of docId -> buffer offsets + * (eg: FixedBitSVForwardIndexReader, FixedByteChunkSVForwardIndexReader (if buffer is uncompressed) etc), false + * otherwise + */ + default boolean isFixedOffsetMappingType() { + throw new UnsupportedOperationException(); + } + + /** + * Returns the base offset of raw data start within the fwd index buffer, if it's of fixed offset mapping type + * @return raw data start offset if the reader is of fixed offset mapping type + */ + default long getRawDataStartOffset() { + throw new UnsupportedOperationException(); + } + + /** + * Returns the length of each entry in the forward index, if it's of fixed offset mapping type + */ + default int getDocLength() { + throw new UnsupportedOperationException(); + } + + /** + * Returns whether the length of each entry in the forward index is in bits, if it's of fixed offset mapping type + */ + default boolean isDocLengthInBits() { + return false; + } + + /** + * This class represents the buffer byte ranges accessed while reading a given docId. + */ + @AllArgsConstructor + @EqualsAndHashCode + @Getter + class ByteRange { + private final long _offset; + private final int _sizeInBytes; + + @Override + public String toString() { + return "Range{" + "_offset=" + _offset + ", _size=" + _sizeInBytes + '}'; + } + } }