Skip to content

Commit

Permalink
Add support to read exact buffer byte ranges corresponding to a given…
Browse files Browse the repository at this point in the history
… forward index doc id (apache#11729)

* Forward index ranges api

---------

Co-authored-by: Saurabh Dubey <[email protected]>
Co-authored-by: Saurabh Dubey <[email protected]>
  • Loading branch information
3 people authored Oct 12, 2023
1 parent 876e58d commit 1b7cb16
Show file tree
Hide file tree
Showing 21 changed files with 868 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.segment.local.io.util;

import java.io.Closeable;
import java.util.List;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;


Expand Down Expand Up @@ -216,6 +218,28 @@ public void unsetBit(int bitOffset) {
_dataBuffer.putByte(byteOffset, (byte) (_dataBuffer.getByte(byteOffset) & (0xFF7F >>> bitOffsetInByte)));
}

public int getNextSetBitOffsetRecordRanges(int bitOffset, long baseOffset,
List<ForwardIndexReader.ByteRange> ranges) {
long startOffset = baseOffset + (bitOffset / Byte.SIZE);
int size = Byte.BYTES;
int byteOffset = bitOffset / Byte.SIZE;
int bitOffsetInFirstByte = bitOffset % Byte.SIZE;
int firstByte = (_dataBuffer.getByte(byteOffset) << bitOffsetInFirstByte) & BYTE_MASK;
if (firstByte != 0) {
ranges.add(new ForwardIndexReader.ByteRange(startOffset, size));
return bitOffset + FIRST_BIT_SET[firstByte];
}
while (true) {
byteOffset++;
size += Byte.SIZE;
int currentByte = _dataBuffer.getByte(byteOffset) & BYTE_MASK;
if (currentByte != 0) {
ranges.add(new ForwardIndexReader.ByteRange(startOffset, size));
return (byteOffset * Byte.SIZE) | FIRST_BIT_SET[currentByte];
}
}
}

public int getNextSetBitOffset(int bitOffset) {
int byteOffset = bitOffset / Byte.SIZE;
int bitOffsetInFirstByte = bitOffset % Byte.SIZE;
Expand All @@ -232,6 +256,32 @@ public int getNextSetBitOffset(int bitOffset) {
}
}

public int getNextNthSetBitOffsetAndRecordRanges(int bitOffset, int n, long baseOffset,
List<ForwardIndexReader.ByteRange> ranges) {
long startOffset = baseOffset + (bitOffset / Byte.SIZE);
int size = Byte.SIZE;
int byteOffset = bitOffset / Byte.SIZE;
int bitOffsetInFirstByte = bitOffset % Byte.SIZE;
ranges.add(new ForwardIndexReader.ByteRange(baseOffset + byteOffset, Byte.BYTES));
int firstByte = (_dataBuffer.getByte(byteOffset) << bitOffsetInFirstByte) & BYTE_MASK;
int numBitsSet = NUM_BITS_SET[firstByte];
if (numBitsSet >= n) {
ranges.add(new ForwardIndexReader.ByteRange(startOffset, size));
return bitOffset + NTH_BIT_SET[n - 1][firstByte];
}
while (true) {
n -= numBitsSet;
byteOffset++;
size += Byte.SIZE;
int currentByte = _dataBuffer.getByte(byteOffset) & BYTE_MASK;
numBitsSet = NUM_BITS_SET[currentByte];
if (numBitsSet >= n) {
ranges.add(new ForwardIndexReader.ByteRange(startOffset, size));
return (byteOffset * Byte.SIZE) | NTH_BIT_SET[n - 1][currentByte];
}
}
}

public int getNextNthSetBitOffset(int bitOffset, int n) {
int byteOffset = bitOffset / Byte.SIZE;
int bitOffsetInFirstByte = bitOffset % Byte.SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@


public class ForwardIndexReaderFactory extends IndexReaderFactory.Default<ForwardIndexConfig, ForwardIndexReader> {
public static final ForwardIndexReaderFactory INSTANCE = new ForwardIndexReaderFactory();
private static volatile ForwardIndexReaderFactory _instance = new ForwardIndexReaderFactory();

public static void setInstance(ForwardIndexReaderFactory factory) {
_instance = factory;
}

public static ForwardIndexReaderFactory getInstance() {
return _instance;
}

@Override
protected IndexType<ForwardIndexConfig, ForwardIndexReader, ?> getIndexType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<St

@Override
protected IndexReaderFactory<ForwardIndexReader> createReaderFactory() {
return ForwardIndexReaderFactory.INSTANCE;
return ForwardIndexReaderFactory.getInstance();
}

public String getFileExtension(ColumnMetadata columnMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
Expand Down Expand Up @@ -53,6 +55,8 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<
protected final int _headerEntryChunkOffsetSize;
protected final PinotDataBuffer _rawData;
protected final boolean _isSingleValue;
protected final int _dataHeaderStart;
protected final int _rawDataStart;

protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType storedType, boolean isSingleValue) {
_dataBuffer = dataBuffer;
Expand Down Expand Up @@ -96,9 +100,11 @@ protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType store
// Slice out the header from the data buffer.
int dataHeaderLength = _numChunks * _headerEntryChunkOffsetSize;
int rawDataStart = dataHeaderStart + dataHeaderLength;
_dataHeaderStart = dataHeaderStart;
_dataHeader = _dataBuffer.view(dataHeaderStart, rawDataStart);

// Useful for uncompressed data.
_rawDataStart = rawDataStart;
_rawData = _dataBuffer.view(rawDataStart, _dataBuffer.size());

_isSingleValue = isSingleValue;
Expand All @@ -115,13 +121,86 @@ protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType store
* @return Chunk for the row
*/
protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) {
int chunkId = docId / _numDocsPerChunk;
int chunkId = getChunkId(docId);
if (context.getChunkId() == chunkId) {
return context.getChunkBuffer();
}
return decompressChunk(chunkId, context);
}

protected int getChunkId(int docId) {
return docId / _numDocsPerChunk;
}

protected void recordDocIdRangesUncompressed(int docId, int rowOffsetSize, List<ByteRange> ranges) {
int chunkId = getChunkId(docId);
int chunkRowId = docId % _numDocsPerChunk;

// These offsets are offset in the data buffer
long chunkStartOffset = getChunkPositionAndRecordRanges(chunkId, ranges);
ranges.add(new ByteRange(chunkStartOffset + (long) chunkRowId * rowOffsetSize, Integer.BYTES));
long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) chunkRowId * rowOffsetSize);
long valueEndOffset =
getValueEndOffsetAndRecordRanges(chunkId, chunkRowId, rowOffsetSize, chunkStartOffset, ranges);

ranges.add(new ByteRange(valueStartOffset, (int) (valueEndOffset - valueStartOffset)));
}

protected long getValueEndOffsetAndRecordRanges(int chunkId, int chunkRowId, int rowOffsetSize, long chunkStartOffset,
List<ByteRange> ranges) {
if (chunkId == _numChunks - 1) {
// Last chunk
if (chunkRowId == _numDocsPerChunk - 1) {
// Last row in the last chunk
return _dataBuffer.size();
} else {
ranges.add(new ByteRange(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize, Integer.BYTES));
int valueEndOffsetInChunk = _dataBuffer.getInt(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize);
if (valueEndOffsetInChunk == 0) {
// Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
return _dataBuffer.size();
} else {
return chunkStartOffset + valueEndOffsetInChunk;
}
}
} else {
if (chunkRowId == _numDocsPerChunk - 1) {
// Last row in the chunk
return getChunkPositionAndRecordRanges(chunkId + 1, ranges);
} else {
ranges.add(new ByteRange(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize, Integer.BYTES));
return chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) (chunkRowId + 1) * rowOffsetSize);
}
}
}

protected void recordDocIdRanges(int docId, ChunkReaderContext context, List<ByteRange> ranges) {
int chunkId = getChunkId(docId);
if (context.getChunkId() == chunkId) {
ranges.addAll(context.getRanges());
return;
}
recordChunkRanges(chunkId, context, ranges);
}

protected void recordChunkRanges(int chunkId, ChunkReaderContext context, List<ByteRange> ranges) {
List<ByteRange> chunkRanges = new ArrayList<>();
int chunkSize;
long chunkPosition = getChunkPositionAndRecordRanges(chunkId, chunkRanges);

// Size of chunk can be determined using next chunks offset, or end of data buffer for last chunk.
if (chunkId == (_numChunks - 1)) { // Last chunk.
chunkSize = (int) (_dataBuffer.size() - chunkPosition);
} else {
long nextChunkOffset = getChunkPositionAndRecordRanges(chunkId + 1, chunkRanges);
chunkSize = (int) (nextChunkOffset - chunkPosition);
}
chunkRanges.add(new ByteRange(chunkPosition, chunkSize));
context.setChunkId(chunkId);
context.setRanges(chunkRanges);
ranges.addAll(chunkRanges);
}

protected ByteBuffer decompressChunk(int chunkId, ChunkReaderContext context) {
int chunkSize;
long chunkPosition = getChunkPosition(chunkId);
Expand Down Expand Up @@ -160,6 +239,18 @@ protected long getChunkPosition(int chunkId) {
}
}

protected long getChunkPositionAndRecordRanges(int chunkId, List<ByteRange> ranges) {
if (_headerEntryChunkOffsetSize == Integer.BYTES) {
ranges.add(
new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Integer.BYTES));
return _dataHeader.getInt(chunkId * _headerEntryChunkOffsetSize);
} else {
ranges.add(
new ByteRange(_dataHeaderStart + chunkId * _headerEntryChunkOffsetSize, Long.BYTES));
return _dataHeader.getLong(chunkId * _headerEntryChunkOffsetSize);
}
}

@Override
public boolean isDictionaryEncoded() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.CleanerUtil;

Expand All @@ -36,24 +41,21 @@
* </ul>
*/
public class ChunkReaderContext implements ForwardIndexReaderContext {
@Getter
private final ByteBuffer _chunkBuffer;

@Getter
@Setter
private int _chunkId;

@Getter
@Setter
private List<ForwardIndexReader.ByteRange> _ranges;

public ChunkReaderContext(int maxChunkSize) {
_chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize);
_chunkId = -1;
}

public ByteBuffer getChunkBuffer() {
return _chunkBuffer;
}

public int getChunkId() {
return _chunkId;
}

public void setChunkId(int chunkId) {
_chunkId = chunkId;
_ranges = new ArrayList<>();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.segment.index.readers.forward;

import java.util.List;
import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter;
import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter;
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
Expand Down Expand Up @@ -58,16 +59,23 @@ public final class FixedBitMVForwardIndexReader implements ForwardIndexReader<Fi
private final int _numValues;
private final int _numDocsPerChunk;

private final long _bitmapReaderStartOffset;
private final long _rawDataReaderStartOffset;
private final int _numBitsPerValue;

public FixedBitMVForwardIndexReader(PinotDataBuffer dataBuffer, int numDocs, int numValues, int numBitsPerValue) {
_numDocs = numDocs;
_numValues = numValues;
_numDocsPerChunk = (int) (Math.ceil((float) PREFERRED_NUM_VALUES_PER_CHUNK / (numValues / numDocs)));
int numChunks = (numDocs + _numDocsPerChunk - 1) / _numDocsPerChunk;
long endOffset = numChunks * Integer.BYTES;
_bitmapReaderStartOffset = endOffset;
_chunkOffsetReader = new FixedByteValueReaderWriter(dataBuffer.view(0L, endOffset));
int bitmapSize = (numValues + Byte.SIZE - 1) / Byte.SIZE;
_bitmapReader = new PinotDataBitSet(dataBuffer.view(endOffset, endOffset + bitmapSize));
endOffset += bitmapSize;
_rawDataReaderStartOffset = endOffset;
_numBitsPerValue = numBitsPerValue;
int rawDataSize = (int) (((long) numValues * numBitsPerValue + Byte.SIZE - 1) / Byte.SIZE);
_rawDataReader =
new FixedBitIntReaderWriter(dataBuffer.view(endOffset, endOffset + rawDataSize), numValues, numBitsPerValue);
Expand Down Expand Up @@ -213,6 +221,71 @@ public void close() {
_rawDataReader.close();
}

@Override
public boolean isBufferByteRangeInfoSupported() {
return true;
}

@Override
public void recordDocIdByteRanges(int docId, Context context, List<ByteRange> ranges) {
int contextDocId = context._docId;
int contextEndOffset = context._endOffset;
int startIndex;
if (docId == contextDocId + 1) {
startIndex = contextEndOffset;
} else {
int chunkId = docId / _numDocsPerChunk;
if (docId > contextDocId && chunkId == contextDocId / _numDocsPerChunk) {
// Same chunk
startIndex =
_bitmapReader.getNextNthSetBitOffsetAndRecordRanges(contextEndOffset + 1, docId - contextDocId - 1,
_bitmapReaderStartOffset, ranges);
} else {
// Different chunk
ranges.add(new ByteRange(chunkId, Integer.BYTES));
int chunkOffset = _chunkOffsetReader.getInt(chunkId);
int indexInChunk = docId % _numDocsPerChunk;
if (indexInChunk == 0) {
startIndex = chunkOffset;
} else {
startIndex = _bitmapReader.getNextNthSetBitOffsetAndRecordRanges(chunkOffset + 1, indexInChunk,
_bitmapReaderStartOffset, ranges);
}
}
}
int endIndex;
if (docId == _numDocs - 1) {
endIndex = _numValues;
} else {
endIndex = _bitmapReader.getNextSetBitOffsetRecordRanges(startIndex + 1, _bitmapReaderStartOffset, ranges);
}
int numValues = endIndex - startIndex;
long startBitOffset = (long) startIndex * _numBitsPerValue;
long byteStartOffset = (startBitOffset / Byte.SIZE);
int size = (int) (((long) numValues * _numBitsPerValue + Byte.SIZE - 1) / Byte.SIZE);

ranges.add(new ByteRange(_rawDataReaderStartOffset + byteStartOffset, size));

// Update context
context._docId = docId;
context._endOffset = endIndex;
}

@Override
public boolean isFixedOffsetMappingType() {
return false;
}

@Override
public long getRawDataStartOffset() {
throw new UnsupportedOperationException("Forward index is not fixed length type");
}

@Override
public int getDocLength() {
throw new UnsupportedOperationException("Forward index is not fixed length type");
}

public static class Context implements ForwardIndexReaderContext {
private int _docId = -1;
// Exclusive
Expand Down
Loading

0 comments on commit 1b7cb16

Please sign in to comment.